Active Object#
Active Fabric#
- class miros.activeobject.ActiveFabricSource#
A (pub-sub) event dispatcher for active objects.
- To create it:
af = ActiveFabric() # it is a singleton
- To subscribe (do this before starting it):
client_deque = deque(maxlen=10) event_a = HsmEvent(signal=signals.A) af.subscribe(client_deque, event_a, queue_type=’fifo’)
- To publish with priority:
event_a = HsmEvent(signal=signals.A) af.publish(event_a, priority=1) # where 1 has the highest priorty
# by default the priorty is set to 1000
To be greedy, and ensure your messages take priority over everything, publish with a very high priority and subscribe using the lifo technique:
client_deque = deque(maxlen=10) event_a = HsmEvent(signal=signals.A)
# By subscribing with the ‘lifo’ technique, any publishing event that this # deque item cares about will be placed at the front of the queue af.subscribe(client_deque, event_a, queue_type=’lifo’)
# By publishing with a priority 1, the supporting tasks checking the active # fabric will ignore all queue items of lower priority and dispatch this # event first. af.publish(event_a, priority=1)
Typically the active fabric is started by the first active object that is started. The active fabric is a singleton, so that all active objects can communicate with each other by communicating to it.
The tasks managed by this class can be started and stopped (needed for testing) It does this by NOT inheriting from the threading class, instead it uses the a more primative approach of keeping an internal object that is a thread which can be killed and restarted.
- clear()#
clear out all subscriptions and queues
- publish(event, priority=None)#
publish an event with a given priority to all subscribed queues
Priority of 1 is the highest priority
- start()#
start up the threads which manage the queues and subscription registry.
- stop()#
stop the the threads in such a way that it can be restarted again
- subscribe(queue, event_or_signal, queue_type=None)#
subscribe a queue to an event in a ‘fifo’ or ‘lifo’ way
- There are two different ways to subscribe to an event:
first in first out - fifo priority queue
last in first out - lifo priority queue
Both methods 1 and 2 have tasks with associated priority queues. The fifo/lifo action only takes place at the moment the managing task puts an item into the external queue used as a registry object. For method 1, the task puts the event on the last location in the queue. For method 2, the task puts the event at the beginning location of the queue.
Typically, you would use the same external deque object to subscribe to events in the two different ways provided by this method. This is best explained with an example:
- Example (FIFO):
active_object_input_queue = deque(maxlen=100) # LockingDeque works too event_a = HsmEvent(signal=signals.A) event_b = HsmEvent(signal=signals.B)
# To show are fifo in action we first push something on our deque: active_object_input_queue.append(event_a)
# subscribe to b using the fifo technique af = ActiveFabric() af.subscribe(active_object_input_queue, event_b) # default to fifo af.start() af.publish(event_b)
# show that our old item is still there assert(active_object_input_queue.pop().signal_name == ‘A’) # our new item was at the end of the list assert(active_object_input_queue.pop().signal_name == ‘B’)
- Example (LIFO):
active_object_input_queue = deque(maxlen=100) # LockingDeque works too event_a = HsmEvent(signal=signals.A) event_b = HsmEvent(signal=signals.B)
# To show are lifo in action push something onto our deque: active_object_input_queue.append(event_a)
# subscribe to b using the lifo technique af = ActiveFabric() af.subscribe(active_object_input_queue, event_b, queue_type=’lifo’) af.start() af.publish(event_b)
# show that the ‘lifo’ technique barged our event into the front of the list assert(active_object_input_queue.pop().signal_name == ‘B’) # our old event is still there assert(active_object_input_queue.pop().signal_name == ‘A’)
- thread_runner_fifo(fabric_task_event, fifo_queue, fifo_subscriptions)#
If this was a Thread class this function would be called “run”
This is the main execution code of the thread. It watches to see if the HsmEvent() singleton has been cleared, if it has it exits its forever loop.
- thread_runner_lifo(fabric_task_event, lifo_queue, lifo_subscriptions)#
If this was a Thread class this function would be called “run”
This is the main execution code of the thread. It watches to see if the HsmEvent() singleton has been cleared, if it has it exits its forever loop.
- miros.activeobject.ActiveFabric#
alias of <miros.singleton.SingletonDecorator object>
Active Object#
- class miros.activeobject.ActiveObject(name=None, instrumented=None)#
- append_publish_to_spy()#
instrument the rtc spy with our publish event
- append_subscribe_to_spy()#
instrument the full spy with our subscription request
- cancel_event(uuid=None)#
This will cancel an event thread that was created using the __post_event api. The original call to the __post_event api would have returned the uuid needed to cancel it with this call.
If there are no threads managing the uuid provided, this method will do nothing.
Example:
- post_id_1 = ao.post_fifo(Event(signal=signals.A),
time=15, period=1.0, deferred=True, queue_type=’lifo’)
ao.cancel_event(post_id_1)
- cancel_events(e)#
This will cancel all events that have the same signal name as e, that were posted using the __post_event.
This will cancel all event threads which have the same signal name as ‘e’, that were created using the __post_event api.
If there are no threads managing the signals name within the event provided, this method will do nothing.
Example:
- post_id_1 = ao.post_fifo(Event(signal=signals.A),
time=15, period=1.0, deferred=True, queue_type=’lifo’)
- post_id_2 = ao.post_fifo(Event(signal=signals.A),
time=15, period=1.0, deferred=True, queue_type=’lifo’)
ao.cancel_events(Event(signal=signals.A))
- post_fifo(e, period=None, times=None, deferred=None)#
post an event, or events to the fifo queue
- Example of posting a single event into the fifo queue:
ao.post_fifo(Event(signal=signals.A))
- Example create a short lived thread to post a one-shot event in one second:
thread_id = ao.post_fifo(Event(signal=signals.A), period=1.0, time=1, deferred=True)
# to cancel this one shot ao.cancel_event(thread_id)
- Example to create a heart beat of 0.7 seconds starting in 0.7 seconds:
thread_id = ao.post_fifo(Event(signal=signals.A), period=0.7, deferred=True)
# to kill this thread: ao.cancel_event(thread_id)
- post_lifo(e, period=None, times=None, deferred=None)#
post an event, or events to the lifo queue
- Example of posting a single event into the lifo queue:
ao.post_lifo(Event(signal=signals.A))
- Example create a short lived thread to post a one-shot event in one second:
thread_id = ao.post_lifo(Event(signal=signals.A), period=1.0, time=1, deferred=True)
# to cancel this one shot ao.cancel_event(thread_id)
- Example to create a heart beat of 0.7 seconds starting in 0.7 seconds:
thread_id = ao.post_lifo(Event(signal=signals.A), period=0.7, deferred=True)
# to kill this thread: ao.cancel_event(thread_id)
- run_event(task_event, fabric_task_event, queue)#
The active object threading function.
If this statechart has not been stopped and the active fabric hasn’t been stopped the threading function will run. If the active fabric has been stopped, it will stop this thread as well by clearing the task_event HsmEvent (threading.Event) object.
This threading method waits on the locking-deque. If the signal is not a STOP_ACTIVE_OBJECT_SIGNAL signal it calls the hsm next_rtc method, which will pop the leftmost item out of the deque part of the locking-deque and dispatch it into the hsm.
- start_at(initial_state)#
start the active object at a given state and begin its task
- stop()#
Stops the active object, can cancels all of its pending events
- top(*args)#
top most state given to all HSMs; treat it as an outside function
- trace()#
Output state transition information only:
Example: print(chart.trace())
[05:23:25.314420] [<state_name>] start_at(): top->hsm_queues_graph_g1_s22 [05:23:25.314420] [<state_name>] D->(): hsm_queues_graph_g1_s22->hsm_queues_graph_g1_s1 [05:23:25.314420] [<state_name>] E->(): hsm_queues_graph_g1_s1->hsm_queues_graph_g1_s01 [05:23:25.314420] [<state_name>] F->(): hsm_queues_graph_g1_s01->hsm_queues_graph_g1_s2111 [05:23:25.314420] [<state_name>] A->(): hsm_queues_graph_g1_s2111->hsm_queues_graph_g1_s321