Flushing background queues in connectivity plug-ins
For most connectivity plug-ins, the correlator's flushAllQueues management request waits until all events have been processed by the plug-ins. However, some connectivity plug-ins implement an internal queue, in which case events are only flushed onto the queue, and it does not wait for them to actually complete. To extend flushAllQueues to any hidden queues, there is an additional mechanism that can be implemented in a plug-in. This has been done for all the relevant plug-ins shipped as part of the product.
This is done using a control message, which is a special type of message sent between the host and the transport. Control messages have empty payloads and store all their information in the metadata instead. To enable receiving these control messages, at least one plug-in in the chain must call the enableQueueFlushMessages function on the host pointer:
void start() override {
host->enableQueueFlushMessages();
}
Note:
Flushing is only available for chains that do not use any of the Java-based connectivity plug-ins.
The control message for flushAllQueues has the sag.control.type metadata field set to QueueFlush. It also has an opaque object in the sag.controlObject metadata field. These constants are available as CONTROL_TYPE, CONTROL_TYPE_QUEUE_FLUSH, and CONTROL_OBJECT constants on the Message class.
The control message contains a custom_t object in the CONTROL_OBJECT metadata field. This is completely opaque to the user, but its lifetime controls the flush. It can be copied if there are multiple things to wait for. The flush is complete when all copies of the object have been destroyed. If you do nothing, this happens when the message is fully processed down the chain. To wait for an additional background queue to be flushed, store the object on the queue (either by moving or copying it from the metadata, or by placing the QueueFlush message itself on the queue). When all previous messages on the queue have been processed, then destroy the object that was on the queue.