Apama  10.15.4.0
sag_connectivity_plugins.hpp
Go to the documentation of this file.
1 /*
2  * Title: sag_connectivity_plugins.hpp
3  * Description: C++ API for writing connectivity plugins
4  * $Copyright (c) 2015-2023 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$
5  * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG
6  */
7 
16 #ifndef _SAG_CONNECTIVITY_PLUGINS_HPP_
17 #define _SAG_CONNECTIVITY_PLUGINS_HPP_
18 
19 #include <sag_connectivity_cpp.hpp>
20 #include <sag_plugin_logging.hpp>
21 #include <memory>
22 #include <mutex>
23 
24 // must define __STDC_FORMAT_MACROS before first include of inttypes else printf macros won't be defined
25 #ifndef __STDC_FORMAT_MACROS
26 #define __STDC_FORMAT_MACROS 1
27 #endif
28 #include <inttypes.h>
29 
30 namespace com {
31 namespace softwareag {
32 namespace _DATAT_INTERNAL_CPP_NAMESPACE {
33 
34 namespace
35 {
39  void replace(std::string &input, const std::string &from, const std::string &to)
40  {
41  size_t pos = 0;
42  while((pos = input.find(from, pos)) != std::string::npos)
43  {
44  input.replace(pos, from.length(), to);
45  pos += to.length();
46  }
47  }
48 
52  std::string to_string(const Message &m, bool truncate=true)
53  {
54  std::string payload = to_string(m.getPayload());
55  if (truncate && payload.length() > 200) payload = payload.substr(0, 196) + " ...";
56  // security sanitization to prevent log message faking
57  replace(payload, "\n", "\\n");
58  replace(payload, "\r", "\\r");
59  return "Message<metadata="+to_string(m.getMetadataMap())+", payload="+payload+">";
60  }
61 }
62 
68 enum class Direction {
72  TOWARDS_HOST = 1,
77 
78 };
79 
84 class PluginHost {
85  friend class Plugin;
86 public:
87  typedef std::unique_ptr<PluginHost> ptr_t;
88 
103  void enableReliability(Direction direction) {
104  if (SAG_ERROR_OK != sag_enable_reliability(chain, static_cast<int>(direction))) {
105  throw std::runtime_error("An error occurred while setting chain reliability");
106  }
107  }
108 
123  if (SAG_ERROR_OK != sag_enable_queue_flush_messages(chain)) {
124  throw std::runtime_error("An error occurred while enabling queue flush messages");
125  }
126  }
127 
129  bool isShuttingDown() {
130  bool isShuttingDown = false;
131  if (SAG_ERROR_OK != sag_is_host_shutting_down(chain, isShuttingDown)) {
132  throw std::runtime_error("An error occurred while checking if host is shutting down");
133  }
134  return isShuttingDown;
135  }
136 private:
140  PluginHost(void* chain = nullptr) :chain(chain) {}
141  void* chain;
142 };
143 
144 // forward decl for parameters friend
145 class Plugin;
146 
157 {
158  // for constructor access to connectivityManager
159  friend class Plugin;
160 
161 public:
163 
165 
170  const map_t &getConfig() const { return config; }
171 
175  const std::string &getChainId() const { return chainId; }
176 
180  const std::string &getPluginName() const { return pluginName; }
181 
182 protected:
186  PluginConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager, void* chain)
187  : chainId(chainId), pluginName(pluginName), config(config.copy()), connectivityManager(connectivityManager), chain(chain)
188  {}
189 
190 private:
191  std::string chainId;
192  std::string pluginName;
193  map_t config;
195  void* connectivityManager;
197  void* chain;
198 };
199 
202 {
203 public:
207  TransportConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager, void* reserved)
208  : PluginConstructorParameters(pluginName, chainId, config, connectivityManager, reserved)
209  {}
210 };
211 
214 {
215 public:
219  CodecConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager, void* reserved)
220  : PluginConstructorParameters(pluginName, chainId, config, connectivityManager, reserved)
221  {}
225  CodecConstructorParameters(const std::string &pluginName)
226  : PluginConstructorParameters(pluginName, "", map_t{}, nullptr, nullptr)
227  {}
228 };
229 
230 
241 {
242 public:
243 
247  static const char* STATUS_ONLINE() { return "ONLINE"; }
251  static const char* STATUS_STARTING() { return "STARTING"; }
255  static const char* STATUS_FAILED() { return "FAILED"; }
256 
267  {
268  friend class StatusReporter;
269  public:
270 
271  ~StatusItem()
272  {
273  sag_delete_user_status_item(connectivityManager, underlying);
274  underlying.item = nullptr;
275  }
276 
284  void setStatus(const std::string &value)
285  {
286  std::unique_lock<std::mutex> ul(status_lock);
287  setStatusLocked(value);
288  }
289 
298  void setStatus(int64_t value)
299  {
300  std::unique_lock<std::mutex> ul(status_lock);
301  intValue = value;
302  setStatusLocked(convert_to_details::integerToString(value));
303  }
304 
312  std::string getStatus() {
313  std::unique_lock<std::mutex> ul(status_lock);
314  return lastValue;
315  }
316 
324  void increment(int64_t incrementValue = 1)
325  {
326  std::unique_lock<std::mutex> ul(status_lock);
327  intValue += incrementValue;
328  setStatusLocked(convert_to_details::integerToString(intValue));
329  }
330 
333  const std::string &key() { return mkey; }
334 
335  private:
336  StatusItem(const StatusItem& other) = delete; // non construction-copyable
337  StatusItem& operator=(const StatusItem&) = delete; // non copyable
338 
339  StatusItem(void* connectivityManager, const std::string &key, const std::string &initialValue, const int64_t intValue)
340  : intValue(intValue),
341  mkey(key),
342  lastValue(initialValue),
343  connectivityManager(connectivityManager),
344  underlying(sag_create_user_status_item(connectivityManager, key.c_str(), initialValue.c_str()))
345  {
346  if (!underlying.item)
347  {
348  std::ostringstream oss;
349  oss << "Failed to create status item '" << key << "' (ensure the key is unique and that this plug-in has not been shutdown already)";
350  throw std::runtime_error(oss.str());
351  }
352  }
353 
354  void setStatusLocked(const std::string &value)
355  {
356  if (value == lastValue) return; // no-op this case
357  lastValue = value;
358 
359  sag_set_user_status_item(underlying, value.c_str());
360  }
361 
362  int64_t intValue;
363  const std::string mkey;
364  std::string lastValue;
365  void* connectivityManager;
366  sag_status_item_t underlying;
367  std::mutex status_lock;
368  };
369 
371  typedef std::unique_ptr<StatusItem> item_ptr;
372 
387  item_ptr createStatusItem(const std::string &key, const std::string &initialValue)
388  {
389  return std::unique_ptr<StatusItem>(new StatusItem(connectivityManager, key, initialValue, 0));
390  }
391 
405  item_ptr createStatusItem(const std::string &key, int64_t initialValue)
406  {
407  std::ostringstream oss;
408  oss << initialValue;
409  return std::unique_ptr<StatusItem>(new StatusItem(connectivityManager, key, oss.str(), initialValue));
410  }
411 
423  void setStatus(const map_t &statusmap)
424  {
425  sag_set_user_status_map(connectivityManager, reinterpret_cast<const sag_underlying_map_t&>(statusmap));
426  // keep track of what keys we've added so we can automatically remove them when we're destroyed
427  for (auto it = statusmap.cbegin(); it != statusmap.cend(); it++)
428  if (it->first.type_tag() == SAG_DATA_STRING) // ignore invalid ones
429  {
430  if (it->second.empty())
431  mapKeysToCleanup.erase(it->first);
432  else
433  mapKeysToCleanup.insert(it->first.copy(), data_t());
434  }
435  }
436 
437 
438 
442  ~StatusReporter()
443  {
444  if (!mapKeysToCleanup.empty())
445  sag_set_user_status_map(connectivityManager, reinterpret_cast<const sag_underlying_map_t&>(mapKeysToCleanup));
446  }
447 
451  explicit StatusReporter(void *connectivityManager) : connectivityManager(connectivityManager), mapKeysToCleanup()
452  {
453  }
454 
455 private:
456  void* connectivityManager;
457  map_t mapKeysToCleanup;
458 
459 
460  StatusReporter() = delete;
461 
462  // non-copyable, due to the cleanup code we don't want people to shoot themselves in the foot by having multiple copies
463  StatusReporter(const StatusReporter& other) = delete;
464  StatusReporter& operator=(const StatusReporter&) = delete;
465 
466 };
467 
475 class Plugin
476 {
477 public:
478  /* Constructor.
479  * @since 9.12.0.1
480  */
481  Plugin(const PluginConstructorParameters &params)
482  : pluginName(params.getPluginName()), chainId(params.getChainId()), config(params.getConfig().copy()),
483  host(new PluginHost(params.chain)), logger("connectivity." + pluginName + "." + chainId),
484  statusReporter(new StatusReporter(params.connectivityManager))
485  {
486  }
487 
493  virtual ~Plugin() {}
494 
505  virtual void start() {}
506 
514  virtual void hostReady() {}
515 
540  virtual void shutdown() {}
542  const std::string &getName() const { return pluginName; }
543 protected:
547  const std::string pluginName;
549  const std::string chainId;
557  const PluginHost::ptr_t host;
566  if (statusReporter) return *statusReporter;
567  throw std::runtime_error("Cannot call getStatusReporter when using the legacy constructor");
568  }
569 
570 public:
575 private:
576  std::unique_ptr<StatusReporter> statusReporter;
577 };
578 
583 class HostSide
584 {
585 public:
587  virtual ~HostSide() {}
589  typedef std::unique_ptr<HostSide> ptr_t;
609  virtual void sendBatchTowardsHost(Message *start, Message *end) = 0;
610 
618  void sendBatchTowardsHost(Message &&message) {
619  sendBatchTowardsHost(&message, &message+1);
620  }
621 
632  template<typename IT>
633  auto sendBatchTowardsHost(const IT &begin, const IT &end) -> typename std::enable_if<
634  !std::is_const<ampl::remove_ref_t<decltype(*begin)>>::value &&
635  ampl::is_same<Message, ampl::remove_const_t<ampl::remove_ref_t<decltype(*begin)>>>::value
636  , void>::type // this ensures we can only pass in non-const iterator pairs to Message
637  {
638  if(begin == end) sendBatchTowardsHost((Message*) nullptr, (Message*) nullptr);
639  else sendBatchTowardsHost(&(*begin), (&(*(end-1)))+1);
640  }
641 };
642 
647 class RemoteHostSide: public HostSide
648 {
649 public:
650  RemoteHostSide(sag_plugin_t other, sag_send_fn_t fn): other(other), fn(fn) {}
651  virtual ~RemoteHostSide() {}
652  virtual void sendBatchTowardsHost(Message *start, Message *end);
653 private:
654  sag_plugin_t other;
655  sag_send_fn_t fn;
656 };
657 
663 {
664 public:
666  virtual ~TransportSide() {}
668  typedef std::unique_ptr<TransportSide> ptr_t;
689  virtual void sendBatchTowardsTransport(Message *start, Message *end) = 0;
690 
699  sendBatchTowardsTransport(&message, &message+1);
700  }
701 
711  template<typename IT>
712  auto sendBatchTowardsTransport(const IT &begin, const IT &end) -> typename std::enable_if<
713  !std::is_const<ampl::remove_ref_t<decltype(*begin)>>::value &&
714  ampl::is_same<Message, ampl::remove_const_t<ampl::remove_ref_t<decltype(*begin)>>>::value
715  , void>::type // this ensures we can only pass in non-const iterator pairs to Message
716  {
717  if(begin == end) sendBatchTowardsTransport((Message*) nullptr, (Message*) nullptr);
718  else sendBatchTowardsTransport(&(*begin), (&(*(end-1)))+1);
719  }
720 };
721 
726 class RemoteTransportSide: public TransportSide
727 {
728 public:
729  RemoteTransportSide(sag_plugin_t other, sag_send_fn_t fn): other(other), fn(fn) {}
730  virtual ~RemoteTransportSide() {}
731  virtual void sendBatchTowardsTransport(Message *start, Message *end);
732 private:
733  sag_plugin_t other;
734  sag_send_fn_t fn;
735 };
736 
737 
759 class AbstractCodec: public Plugin, public HostSide, public TransportSide
760 {
761 public:
763 
773  : Plugin(params)
774  {}
775 
776  // These methods do not need to show up in doxygen
777  /* Called between construction and start() to set the hostSide field */
778  virtual void setNextTowardsHost(HostSide::ptr_t &&host)
779  {
780  hostSide = std::move(host);
781  }
782  /* Called between construction and start() to set the transportSide field */
783  virtual void setNextTowardsTransport(TransportSide::ptr_t &&transport)
784  {
785  transportSide = std::move(transport);
786  }
787 protected:
793 
799 };
800 
830 {
831 public:
833 
843  : Plugin(params)
844  {}
845 
846  // This method does not need to show up in doxygen
847  /* Called between construction and start() to set the hostSide field */
848  virtual void setNextTowardsHost(HostSide::ptr_t &&host)
849  {
850  hostSide = std::move(host);
851  }
852 
853 protected:
859 };
860 
873 {
874 public:
875 
885  : AbstractTransport(params)
886  {}
887 
888 
894  virtual void sendBatchTowardsTransport(Message *start, Message *end)
895  {
896  for (Message *it = start; it != end; ++it) {
897  try {
898  if (it->getPayload().empty()) {
899  deliverNullPayloadTowardsTransport(*it);
900  } else {
901  deliverMessageTowardsTransport(*it);
902  }
903  } catch (...) {
904  handleException(*it);
905  }
906  }
907  }
909  virtual void deliverMessageTowardsTransport(Message &msg) = 0;
912  {
913  // do nothing
914  }
915 
934  virtual void handleException(Message &m)
935  {
936  try {
937  throw;
938  } catch (const std::exception &e) {
939  logger.warn("Error while delivering message: %s; %s will be dropped.", e.what(), to_string(m).c_str());
940  } catch (...) {
941  logger.warn("Unknown error delivering message: %s", to_string(m).c_str());
942  }
943  }
944 };
945 
962 {
963 public:
964 
974  : AbstractCodec(params)
975  {}
976 
983  virtual void sendBatchTowardsHost(Message *start, Message *end)
984  {
985  Message *curr = start;
986  for (Message *it = start; it != end; ++it) {
987  bool rv;
988  try {
989  if (it->getPayload().empty()) {
990  rv = transformNullPayloadTowardsHost(*it);
991  } else {
992  rv = transformMessageTowardsHost(*it);
993  }
994  } catch (...) {
995  rv = handleException(*it, false);
996  }
997  // if we keep it (and it didn't throw) swap it with the accumulator
998  if (rv) {
999  if (it != curr) it->swap(std::move(*curr)); // don't self-swap
1000  ++curr;
1001  }
1002  }
1003  if (hostSide.get()) hostSide->sendBatchTowardsHost(start, curr);
1004  }
1011  virtual void sendBatchTowardsTransport(Message *start, Message *end)
1012  {
1013  Message *curr = start;
1014  for (Message *it = start; it != end; ++it) {
1015  bool rv;
1016  try {
1017  // process the message
1018  if (it->getPayload().empty()) {
1019  rv = transformNullPayloadTowardsTransport(*it);
1020  } else {
1021  rv = transformMessageTowardsTransport(*it);
1022  }
1023  } catch (...) {
1024  rv = handleException(*it, true);
1025  }
1026  // if we keep it (and it didn't throw) swap it with the accumulator
1027  if (rv) {
1028  if (it != curr) it->swap(std::move(*curr)); // don't self-swap
1029  ++curr;
1030  }
1031  }
1032  if (transportSide.get()) transportSide->sendBatchTowardsTransport(start, curr);
1033  }
1034 
1039  virtual bool transformMessageTowardsHost(Message &msg) = 0;
1044  virtual bool transformMessageTowardsTransport(Message &msg) = 0;
1050  {
1051  // do nothing
1052  return true;
1053  }
1059  {
1060  // do nothing
1061  return true;
1062  }
1084  virtual bool handleException(Message &m, bool towardsTransport)
1085  {
1086  try {
1087  throw;
1088  } catch (const std::exception &e) {
1089  logger.warn("Error while transforming message: %s; %s will be dropped.", e.what(), to_string(m).c_str());
1090  } catch (...) {
1091  logger.warn("Unknown error transforming message: %s", to_string(m).c_str());
1092  }
1093  return false;
1094  }
1095 };
1096 
1097 }
1098 
1099 namespace connectivity { using namespace _DATAT_INTERNAL_CPP_NAMESPACE; }
1100 
1101 }} // com.softwareag.connectivity
1102 
1103 // internal implementation included from these files
1104 #include <sag_internal/exception.hpp>
1105 #include <sag_internal/remote_plugins.hpp>
1106 #include <sag_internal/plugin_macros.hpp>
1107 
1117 #define SAG_DECLARE_CONNECTIVITY_TRANSPORT_CLASS(Class) _SAG_DECLARE_CONNECTIVITY_TRANSPORT_CLASS(Class)
1118 
1128 #define SAG_DECLARE_CONNECTIVITY_CODEC_CLASS(Class) _SAG_DECLARE_CONNECTIVITY_CODEC_CLASS(Class)
1129 
1130 #endif // _SAG_CONNECTIVITY_PLUGINS_HPP_
A container for parameters passed to the constructor of a codec plug-in.
Definition: sag_connectivity_plugins.hpp:213
const std::string pluginName
The name used for this plug-in in the configuration file.
Definition: sag_connectivity_plugins.hpp:547
Base class that simplifies implementation of codec plug-ins that deal only with individual messages n...
Definition: sag_connectivity_plugins.hpp:961
AbstractTransport(const PluginConstructorParameters::TransportConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:842
Base class that simplifies implementation of transport plug-ins that deal only with individual messag...
Definition: sag_connectivity_plugins.hpp:872
void increment(int64_t incrementValue=1)
Set an integer status value by incrementing the previous integer value that was set by this object.
Definition: sag_connectivity_plugins.hpp:324
HostSide::ptr_t hostSide
The next plug-in in the chain towards host.
Definition: sag_connectivity_plugins.hpp:792
An interface to the next component (plugin or host) towards the host.
Definition: sag_connectivity_plugins.hpp:583
const_iterator cbegin() const
Forward const_iterator begin.
Definition: sag_connectivity_cpp.hpp:313
map_t config
The configuration of this plug-in.
Definition: sag_connectivity_plugins.hpp:551
item_ptr createStatusItem(const std::string &key, int64_t initialValue)
Creates a StatusItem instance that can be used to report status for a given key, using an integral in...
Definition: sag_connectivity_plugins.hpp:405
Direction
The enumeration indicating the direction of message flow - towards the transport or towards the host.
Definition: sag_connectivity_plugins.hpp:68
Class for writing to the system logger.
Definition: sag_plugin_logging.hpp:71
const PluginHost::ptr_t host
Interface to support miscellaneous requests from this plug-in to the host system.
Definition: sag_connectivity_plugins.hpp:557
StatusReporter & getStatusReporter()
Allows reporting status information from this plug-in, such as online or failed status and number of ...
Definition: sag_connectivity_plugins.hpp:565
const_iterator cend() const
Forward const_iterator end.
Definition: sag_connectivity_cpp.hpp:315
Contains the C++ implementation of the underlying datatypes used by connectivity plugins and their ac...
void setStatus(const map_t &statusmap)
Set multiple related string status values at the same time (atomically).
Definition: sag_connectivity_plugins.hpp:423
virtual void sendBatchTowardsTransport(Message *start, Message *end)
Implements batch sending, calling deliverMessageTowardsTransport(Message&) for each message individua...
Definition: sag_connectivity_plugins.hpp:894
Base of the inheritance tree for Connectivity plugins.
Definition: sag_connectivity_plugins.hpp:475
TransportSide::ptr_t transportSide
The next plug-in in the chain towards transport.
Definition: sag_connectivity_plugins.hpp:798
std::unique_ptr< StatusItem > item_ptr
Unique pointer to a StatusItem.
Definition: sag_connectivity_plugins.hpp:371
void setStatus(const std::string &value)
Set a string status value.
Definition: sag_connectivity_plugins.hpp:284
AbstractSimpleTransport(const PluginConstructorParameters::TransportConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:884
Logger logger
Logging for writing to the host log file.
Definition: sag_connectivity_plugins.hpp:574
virtual bool transformNullPayloadTowardsTransport(Message &msg)
Transform a message with a null payload in a transport-wards direction.
Definition: sag_connectivity_plugins.hpp:1058
virtual void start()
Called when an entire chain has been created and the plugin is allowed to start up (after all plugins...
Definition: sag_connectivity_plugins.hpp:505
auto sendBatchTowardsTransport(const IT &begin, const IT &end) -> typename std::enable_if< !std::is_const< ampl::remove_ref_t< decltype(*begin)>>::value &&ampl::is_same< Message, ampl::remove_const_t< ampl::remove_ref_t< decltype(*begin)>>>::value, void >::type
Overload for sending messages using an iterator range.
Definition: sag_connectivity_plugins.hpp:712
std::enable_if< get_underlying< T >::value, std::string >::type to_string(const T &t)
Get a string representation of t.
A class allowing a plug-in to report status values to the host.
Definition: sag_connectivity_plugins.hpp:240
std::unique_ptr< TransportSide > ptr_t
Pointers to TransportSides should always be this ptr_t type, which is a std::unique_ptr.
Definition: sag_connectivity_plugins.hpp:668
virtual ~TransportSide()
Ensure virtual destruction.
Definition: sag_connectivity_plugins.hpp:666
Base class for transport plug-ins.
Definition: sag_connectivity_plugins.hpp:829
static const char * STATUS_ONLINE()
Returns a constant that should be used as the status value when a component is online,...
Definition: sag_connectivity_plugins.hpp:247
item_ptr createStatusItem(const std::string &key, const std::string &initialValue)
Creates a StatusItem instance that can be used to report status for a given key.
Definition: sag_connectivity_plugins.hpp:387
const std::string & getName() const
The name used for this plug-in in the configuration file.
Definition: sag_connectivity_plugins.hpp:542
bool isShuttingDown()
Check if host is shutting down.
Definition: sag_connectivity_plugins.hpp:129
virtual void handleException(Message &m)
Handle an exception thrown while delivering a message towards the transport.
Definition: sag_connectivity_plugins.hpp:934
void enableQueueFlushMessages()
Enable notifications about queue flushes.
Definition: sag_connectivity_plugins.hpp:122
static const char * STATUS_FAILED()
Returns a constant that should be used as the status value when a component is not currently operatio...
Definition: sag_connectivity_plugins.hpp:255
virtual ~HostSide()
Ensure virtual destruction.
Definition: sag_connectivity_plugins.hpp:587
AbstractSimpleCodec(const PluginConstructorParameters::CodecConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:973
virtual void hostReady()
Called some time after start(), when the host is ready to start receiving input (sends will be queued...
Definition: sag_connectivity_plugins.hpp:514
Base class for codec plug-ins.
Definition: sag_connectivity_plugins.hpp:759
void sendBatchTowardsHost(Message &&message)
Overload for sending a batch containing a single message.
Definition: sag_connectivity_plugins.hpp:618
const std::string & getPluginName() const
Get the name used in the configuration file for this plug-in.
Definition: sag_connectivity_plugins.hpp:180
static const char * STATUS_STARTING()
Returns a constant that should be used as the status value when a component is still starting,...
Definition: sag_connectivity_plugins.hpp:251
void setStatus(int64_t value)
Set an integer status value.
Definition: sag_connectivity_plugins.hpp:298
A map class which implements many of the functions on std::map.
Definition: sag_connectivity_cpp.hpp:35
const std::string & key()
Get the unique key specified when this status item was created.
Definition: sag_connectivity_plugins.hpp:333
A class that can be used to efficiently update the value associated with a single status key.
Definition: sag_connectivity_plugins.hpp:266
A container for an payload and associated metadata.
Definition: sag_connectivity_cpp.hpp:26
virtual void shutdown()
Stop processing messages and terminate and join any background threads.
Definition: sag_connectivity_plugins.hpp:540
virtual void sendBatchTowardsHost(Message *start, Message *end)
Implements batch sending, calling transformMessageTowardsHost(Message &) for each message individuall...
Definition: sag_connectivity_plugins.hpp:983
An interface to the next component (plugin or host) towards the transport.
Definition: sag_connectivity_plugins.hpp:662
AbstractCodec(const PluginConstructorParameters::CodecConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:772
auto sendBatchTowardsHost(const IT &begin, const IT &end) -> typename std::enable_if< !std::is_const< ampl::remove_ref_t< decltype(*begin)>>::value &&ampl::is_same< Message, ampl::remove_const_t< ampl::remove_ref_t< decltype(*begin)>>>::value, void >::type
Overload for sending messages using an iterator range.
Definition: sag_connectivity_plugins.hpp:633
map_t copy() const
Return a deep copy of this map.
Definition: sag_connectivity_cpp.hpp:242
virtual void deliverNullPayloadTowardsTransport(Message &msg)
Deliver a message with a null payload.
Definition: sag_connectivity_plugins.hpp:911
void enableReliability(Direction direction)
Enable reliable messaging for the chain that this plug-in belongs to, in a particular direction i....
Definition: sag_connectivity_plugins.hpp:103
Contains the headers needed to implement your own Connectivity Plugins.
const std::string & getChainId() const
Get the identifier used for the chain this plug-in is part of.
Definition: sag_connectivity_plugins.hpp:175
A container for parameters passed to the constructor of a transport plug-in.
Definition: sag_connectivity_plugins.hpp:201
The direction of messages flowing towards the host (from the transport).
Interface to support miscellaneous requests from a particular plug-in to the host system.
Definition: sag_connectivity_plugins.hpp:84
A base interface for parameters passed to the constructor of transport or codec plug-ins.
Definition: sag_connectivity_plugins.hpp:156
virtual bool handleException(Message &m, bool towardsTransport)
Handle an exception thrown while delivering a message.
Definition: sag_connectivity_plugins.hpp:1084
utf8-encoded const char*
Definition: sag_connectivity_c.h:45
const std::string chainId
The identifier used for the chain this plug-in is part of.
Definition: sag_connectivity_plugins.hpp:549
The direction of messages flowing towards the transport (from the host).
void sendBatchTowardsTransport(Message &&message)
Overload for sending a batch containing a single message.
Definition: sag_connectivity_plugins.hpp:698
std::string getStatus()
Return the value this status item was set to most recently by this class.
Definition: sag_connectivity_plugins.hpp:312
A variant type which can be one of the following:
Definition: sag_connectivity_cpp.hpp:41
std::unique_ptr< HostSide > ptr_t
Pointers to HostSides should always be this ptr_t type, which is a std::unique_ptr.
Definition: sag_connectivity_plugins.hpp:589
virtual void sendBatchTowardsTransport(Message *start, Message *end)
Implements batch sending, calling transformMessageTowardsTransport(Message &) for each message indivi...
Definition: sag_connectivity_plugins.hpp:1011
const map_t & getConfig() const
Get the configuration for this plug-in.
Definition: sag_connectivity_plugins.hpp:170
virtual ~Plugin()
This destructor must be virtual.
Definition: sag_connectivity_plugins.hpp:493
HostSide::ptr_t hostSide
The next plug-in in the chain towards host.
Definition: sag_connectivity_plugins.hpp:858
virtual bool transformNullPayloadTowardsHost(Message &msg)
Transform a message with a null payload in a host-wards direction.
Definition: sag_connectivity_plugins.hpp:1049