apama.correlator module

Contains PySys extensions for starting and interacting with correlator processes.

class apama.correlator.CorrelatorHelper(parent, port=None, host=None, name='correlator')[source]

Bases: apama.common.ApamaServerProcess

Class for an instance of an Apama Correlator.

This class provides the ability to configure, start and interact with a Correlator. For example:

correlator = CorrelatorHelper(self, name='mycorrelator')
correlator.start(logfile="mycorrelator.log")
correlator.inject(filenames=["simple.mon"])
Variables:
  • parent (pysys.basetest.BaseTest) – Reference to the PySys testcase instantiating this class instance
  • port (integer) – Port used for starting and interaction with the Correlator
  • host (string) – Hostname for interaction with a remote Correlator
  • environ (dict) – The environment for running the Correlator
WATCH_COLUMNS = ['Uptime (ms)', '# Contexts', '# Monitors', '# Sub-monitors', '# Java apps', '# Listeners', '# Sub-listeners', '# Event types', 'Input queue', '# Received events', 'Route queue', '# Routed events', '# Consumers', 'Output queue', '# Created output events', '# Sent output events', '# Processed events', 'Slowest context name', 'Slowest context queue size', 'Slowest receiver', 'Slowest receiver queue']
__init__(parent, port=None, host=None, name='correlator')[source]

Create an instance of the CorrelatorHelper class.

If no port parameter is used in the argument list an available port will be dynamically found from the OS and used for starting the Correlator, and performing all operations against it. The host parameter is only used to perform operations against a remote Correlator started external to the PySys framework - the class does not support the starting of a Correlator remote to the localhost.

Parameters:
  • parent – Reference to the parent PySys testcase
  • port – The port used for starting and interacting with the Correlator
  • host – The hostname used for interaction with a remote Correlator
  • name – A display name for this process (default is “correlator”), also used for the default stdout/err filenames.
addToClassPath(path)[source]

Add the supplied path to the Java classpath variable for starting this instance.

addToPath(path)[source]

Add the supplied path to the PATH (on Windows) or LD_LIBRARY_PATH (on Unix) environment variable for starting this instance.

applicationEventLogging(enable=True, **xargs)[source]

Enable and disable application event logging.

Provides a wrapper around the engine_management command line tool to enable and disable application event logging. Once enabled, application event logging will log to the correlator log file information specific processing occurrences, e.g. the receipt of events for processing, the triggering of listeners, execution of the garbage collector etc.

Parameters:
  • enable – Set to True to enable, set to False to disable event logging
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
classpath = None

Holds the Java classpath used when starting a correlator with JVM.

connect(source, channel=None, channels=None, mode=None, **xargs)[source]

Connect a Correlator to this instance as a source.

Parameters:
  • source – An instance of the CorrelatorHelper class to act as the source
  • channel – The channel to make the connection on
  • channels – The list of channels to make the connection on
  • mode – The connection mode - ‘legacy’ or ‘parallel’; parallel uses a connection per channel
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
delete(names=[], filename=None, filedir=None, force=False, kill=False, all=False, utf8=False, **xargs)[source]

Delete named objects from the Event Crrelator.

Parameters:
  • names – List of names to delete from the Correlator
  • filename – The basename of a file containing a set of names to delete
  • filedir – The directory containing filename (defaults to testcase input subdirectory)
  • force – Force deletion of names even if they are in use
  • kill – Kill name even if it is a running monitor
  • all – Delete everything in the Correlator
  • utf8 – Assume input is in UTF8
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
disconnect(source, channel=None, channels=None, mode=None, **xargs)[source]

Disconnect a correlator to this instance as a source correlator.

Parameters:
  • source – An instance of the CorrelatorHelper class acting as the source
  • channel – The channel to be disconnected
  • channels – The list of channels to be disconnected
  • mode – The connection mode - ‘legacy’ or ‘parallel’; parallel uses a connection per channel
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
flush(timeout=60, count=1, **xargs)[source]

Make sure all events have been flushed through the correlator.

Currently implemented by using the flushAllQueues management request. Will initate a cycle where each queue in the correlator is drained, optionally repeated count times. This is useful when you have a multi-context application.

Parameters:
  • timeout – The amount of time to wait
  • count – The number of times to ensure queues are flushed
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
hasLicence()[source]

Does this correlator instance have access to a licence file?

initialize(path, correlatorName=None, properties=None, include=None, exclude=None, **xargs)[source]

Initialize the Correlator by injecting all the files making up the project, typically based on a Designer launch configuration .deploy file.

This is usually the simplest way to inject all the files from an application into the correlator. Alternative approaches are to call the injectEPL and related methods individually for each file, or to specify the files in the “initialization” section of a yaml file passed into the correlator start call using the “config” argument.

Queries and Digital Event Services .mon files will be generated automatically as part of injection, but any Java jar files must be compiled manually before invoking this method.

Parameters:
  • path – Path of a .deploy file from Designer (recommended), a directory, or a text file listing the files to be injected. Must be an absolute path, or relative to the testcase output dir.
  • correlatorName – The name of the Correlator as specified in the launch configuration .deploy file, e.g “defaultCorrelator”. If not specified, the name of this pysys correlator will be used.
  • properties – Optional path to a .properties file specifying ${var} placeholders to be used for resolving the paths of any files outside the project directory. Absolute path or relative to output dir.
  • include – a comma-separated string specifying which of the project files found by the tool should be injected, e.g. **/foo/Bar*.evt,**.mon. If not specified, all files will be included (unless specifically excluded)
  • exclude – a comma-separated string specifying which of the project files found by the tool should NOT be injected, e.g. **/foo/Bar*.evt.
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
injectCDP(filenames=[], filedir=None, **xargs)[source]

Inject Correlator deployment package into the Correlator.

See also initialize.

Parameters:
  • filenames – List of the basename of cdp files to inject into the Correlator
  • filedir – Directory containing the input cdp files (defaults to testcase input directory)
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
injectEPL(filenames=[], filedir=None, utf8=False, **xargs)[source]

Inject EPL *.mon files into the Correlator.

See also initialize.

Parameters:
  • filenames – List of the basename of EPL files to inject into the Correlator
  • filedir – Directory containing the input EPL files (defaults to testcase input directory)
  • utf8 – Assume input is in UTF8
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
injectJMON = None

Alias for injectJava

See:injectJava
injectJava(filename, filedir=None, **xargs)[source]

Inject a Java plug-in or application into the Correlator.

See also initialize.

Parameters:
  • filename – The basename of the jar file to inject into the Correlator
  • filedir – The directory containing filename (defaults to testcase input subdirectory)
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
injectMon = None

Alias for injectEPL

See:injectEPL
injectMonitorscript = None

Alias for injectEPL

See:injectEPL
injectQuery(filename, filedir=None, diagnostics=False, **xargs)[source]

Inject a Query into the Correlator.

See also initialize.

Parameters:
  • filename – The basename of the query file to inject into the Correlator
  • filedir – The directory containing filename (defaults to testcase input subdirectory)
  • diagnostics – Enable runtime diagnostic logging in the query
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
inspect(filename='inspect.txt', filedir=None, raw=False, **xargs)[source]

Obtain information about what application(s) have been injected into the Correlator and what listeners are in existence.

This runs as a FOREGROUND process.

Parameters:
  • filename – The basename of the file to write the information to, e.g. inspect.txt
  • filedir – The directory to write filename to (defaults to testcase output subdirectory)
  • raw – Use parser-friendly output format
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
licence = None
profilingGet(filename, filedir=None, **xargs)[source]

Obtain the latest profiling statistics from the Correlator.

Parameters:
  • filename – The basename of the file to write the profiling statistics to
  • filedir – The directory to write filename to (defaults to testcase output subdirectory)
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
profilingOff(**xargs)[source]

Inform the Correlator to stop collecting profiling statistics.

Parameters:xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
profilingOn(**xargs)[source]

Inform the Correlator to start collecting profiling statistics.

profilingReset(**xargs)[source]

Inform the Correlator to reset it’s collection of profiling statistics.

Parameters:xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
receive(filename=None, filedir=None, channels=[], suppressBatch=True, zeroAtFirstBatch=False, utf8=False, logChannels=False, **xargs)[source]

Attach a receiver to the Correlator.

Returns the process for the receiver.

Parameters:
  • filename – The basename of the file to write events received from the Correlator to
  • filedir – The directory to write filename to (defaults to testcase output subdirectory)
  • channels – List of channel names to subscribe to
  • logChannels – Print the channel each event came from in the output
  • suppressBatch – Do not include BATCH timestamps in the output
  • zeroAtFirstBatch – Measure BATCH timestamps from when the first batch arrived
  • utf8 – Write output in UTF8
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
send(filenames=[], filedir=None, loop=None, utf8=False, channel=None, **xargs)[source]

Send events from one or more file into the Correlator.

See the documentation for engine_send for more information.

Parameters:
  • filenames – List of the basename of event files to send into the Correlator
  • filedir – Directory containing the input event files (defaults to testcase input directory)
  • loop – Number of times to loop through the input file
  • utf8 – Assume input is in UTF8
  • channel – The channel to which events are to be sent except when specified on a per-event basis. If a channel is not specified for an event and you do not specify this option, the event is delivered to the default channel, which means the event will go to all public contexts.
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
sendEventStrings(*eventStrings, **xargs)[source]

Send one or more event strings into the Correlator.

This method writes a temporary file containing the specified strings.

See the documentation for engine_send for more information.

Parameters:
  • eventStrings – One or more event strings to be sent to this correlator. May be unicode or UTF-8 byte strings. May include a channel designator. Example: self.sendEventStrings(‘mypackage.Event1()’, ‘mypackage.Event2(“Hello World”)’)
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
  • channel – The channel to which events are to be sent except when specified on a per-event basis. If a channel is not specified for an event and you do not specify this option, the event is delivered to the default channel, which means the event will go to all public contexts.
setApplicationLogFile(filename=None, filedir=None, **xargs)[source]

Set the application log file name.

On setting the application log file details, the output of all native log commands within EPL will be logged to the designated log file. This allows separation between the log statements written by the Correlator i.e. for status, errors etc, and those generated by the actual application.

Parameters:
  • filename – The basename of the file to write the application log file to
  • filedir – The directory to write filename to (defaults to testcase output subdirectory)
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
setApplicationLogLevel(verbosity, **xargs)[source]

Set the application log level.

Parameters:
start(logfile=None, verbosity=None, java=False, Xclock=None, environ=None, inputLog=None, waitForServerUp=True, config=None, **xargs)[source]

Start the Correlator.

Parameters:
  • logfile – Name of the Correlator log file (if used, set this to something similar to the display “name” passed to the constructor)
  • verbosity – The verbosity level of the Correlator logging
  • java – If pysys.constants.False then the Correlator will be started with support for JMON applications
  • Xclock – If pysys.constants.True then the Correlator will be started in externally clocked mode
  • environ – Map of environment variables to override.
  • inputLog – Relative or absolute path of file to write input log to, containing all events, EPL and other inputs sent to the correlator. The format of the input log may change without notice so should not be replied up on testcases, however it can be useful to review manually for diagnostic purposes, and the input log can also be used to pass information to customer support in the event of a problem.
  • waitForServerUp – Set to False to disable automatically waiting until the component is ready
  • config – path or list of paths to a initialization or connectivity configuration file or directory containing them
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
toStringAll(filename, filedir=None, **xargs)[source]

Obtain a stringified representation of the current application state from the Correlator.

Parameters:
  • filename – The basename of the file to write the dump of application state to
  • filedir – The directory to write filename to (defaults to testcase output subdirectory)
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
waitForCorrelatorUp(*args, **kwargs)[source]

Block until the Correlator declares itself to be ready for processing.

Deprecated:Use waitForComponentUp instead.
Parameters:xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
watch(filename=None, filedir=None, raw=False, interval=None, **xargs)[source]

Obtain runtime operational statistics from the Correlator.

By default this runs as a BACKGROUND process. The process is returned by the method call.

Parameters:
  • filename – The basename of the file to write the runtime operational status to
  • filedir – The directory to write filename to (defaults to testcase output subdirectory)
  • raw – Obtain csv format data when logging to file
  • interval – The polling interval (seconds) between logging to file
  • xargs – Optional pysys.process.user.ProcessUser.startProcess keyword arguments, e.g. arguments, stdouterr, timeout, ignoreExitStatus, workingDir.
Note:

When outputing data in the raw (csv) format, the column identifiers and their positions are defined by WATCH_COLUMNS. Use CorrelatorHelper.WATCH_COLUMNS to look up the column position for a given identifier: