18 #ifndef __IGN_TRANSPORT_NODE_HH_INCLUDED__ 19 #define __IGN_TRANSPORT_NODE_HH_INCLUDED__ 22 #pragma warning(push, 0) 24 #include <google/protobuf/message.h> 35 #include <unordered_set> 71 public:
virtual ~
Node();
79 public:
template<
typename T>
bool Advertise(
const std::string &_topic,
82 std::string fullyQualifiedTopic;
84 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
86 std::cerr <<
"Topic [" << _topic <<
"] is not valid." << std::endl;
90 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
92 auto currentTopics = this->TopicsAdvertised();
94 if (currentTopics.find(fullyQualifiedTopic) != currentTopics.end())
96 std::cerr <<
"Topic [" << _topic <<
"] already advertised. You cannot" 97 <<
" advertise the same topic twice on the same node." 98 <<
" If you want to advertise the same topic with different" 99 <<
" types, use separate nodes" << std::endl;
104 this->TopicsAdvertised().insert(fullyQualifiedTopic);
108 this->Shared()->myAddress,
109 this->Shared()->myControlAddress,
110 this->Shared()->pUuid, this->NodeUuid(), _options.Scope(),
113 if (!this->Shared()->discovery->AdvertiseMsg(publisher))
115 std::cerr <<
"Node::Advertise(): Error advertising a topic. " 116 <<
"Did you forget to start the discovery service?" 126 public: std::vector<std::string> AdvertisedTopics()
const;
131 public:
bool Unadvertise(
const std::string &_topic);
137 public:
bool Publish(
const std::string &_topic,
148 const std::string &_topic,
149 void(*_cb)(
const T &_msg))
151 std::function<void(const T &)> f = [_cb](
const T & _internalMsg)
153 (*_cb)(_internalMsg);
156 return this->Subscribe<T>(_topic, f);
166 const std::string &_topic,
167 std::function<
void(
const T &_msg)> &_cb)
169 std::string fullyQualifiedTopic;
171 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
173 std::cerr <<
"Topic [" << _topic <<
"] is not valid." << std::endl;
178 std::shared_ptr<SubscriptionHandler<T>> subscrHandlerPtr(
182 subscrHandlerPtr->SetCallback(_cb);
184 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
190 this->Shared()->localSubscriptions.AddHandler(
191 fullyQualifiedTopic, this->NodeUuid(), subscrHandlerPtr);
194 this->TopicsSubscribed().insert(fullyQualifiedTopic);
197 if (!this->Shared()->discovery->DiscoverMsg(fullyQualifiedTopic))
199 std::cerr <<
"Node::Subscribe(): Error discovering a topic. " 200 <<
"Did you forget to start the discovery service?" 216 public:
template<
typename C,
typename T>
bool Subscribe(
217 const std::string &_topic,
218 void(C::*_cb)(
const T &_msg),
221 std::function<void(const T &)> f = [_cb, _obj](
const T & _internalMsg)
223 auto cb = std::bind(_cb, _obj, std::placeholders::_1);
227 return this->Subscribe<T>(_topic, f);
235 public: std::vector<std::string> SubscribedTopics()
const;
240 public:
bool Unsubscribe(
const std::string &_topic);
254 public:
template<
typename T1,
typename T2>
bool Advertise(
255 const std::string &_topic,
256 void(*_cb)(
const T1 &_req, T2 &_rep,
bool &_result),
259 std::function<void(const T1 &, T2 &, bool &)> f =
260 [_cb](
const T1 &_internalReq, T2 &_internalRep,
bool &_internalResult)
262 (*_cb)(_internalReq, _internalRep, _internalResult);
265 return this->Advertise<T1, T2>(_topic, f, _options);
280 public:
template<
typename T1,
typename T2>
bool Advertise(
281 const std::string &_topic,
282 std::function<
void(
const T1 &_req, T2 &_rep,
bool &_result)> &_cb,
285 std::string fullyQualifiedTopic;
287 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
289 std::cerr <<
"Topic [" << _topic <<
"] is not valid." << std::endl;
294 std::shared_ptr<RepHandler<T1, T2>> repHandlerPtr(
298 repHandlerPtr->SetCallback(_cb);
300 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
303 this->SrvsAdvertised().insert(fullyQualifiedTopic);
309 this->Shared()->repliers.AddHandler(
310 fullyQualifiedTopic, this->NodeUuid(), repHandlerPtr);
314 this->Shared()->myReplierAddress,
315 this->Shared()->replierId.ToString(),
316 this->Shared()->pUuid, this->NodeUuid(), _options.Scope(),
317 T1().GetTypeName(), T2().GetTypeName());
319 if (!this->Shared()->discovery->AdvertiseSrv(publisher))
321 std::cerr <<
"Node::Advertise(): Error advertising a service. " 322 <<
"Did you forget to start the discovery service?" 343 public:
template<
typename C,
typename T1,
typename T2>
bool Advertise(
344 const std::string &_topic,
345 void(C::*_cb)(
const T1 &_req, T2 &_rep,
bool &_result),
349 std::function<void(const T1 &, T2 &, bool &)> f =
350 [_cb, _obj](
const T1 &_internalReq,
352 bool &_internalResult)
354 auto cb = std::bind(_cb, _obj, std::placeholders::_1,
355 std::placeholders::_2, std::placeholders::_3);
356 cb(_internalReq, _internalRep, _internalResult);
359 return this->Advertise<T1, T2>(_topic, f, _options);
364 public: std::vector<std::string> AdvertisedServices()
const;
376 public:
template<
typename T1,
typename T2>
bool Request(
377 const std::string &_topic,
379 void(*_cb)(
const T2 &_rep,
const bool _result))
381 std::function<void(const T2 &, const bool)> f =
382 [_cb](
const T2 &_internalRep,
const bool _internalResult)
384 (*_cb)(_internalRep, _internalResult);
387 return this->Request<T1, T2>(_topic, _req, f);
400 public:
template<
typename T1,
typename T2>
bool Request(
401 const std::string &_topic,
403 std::function<
void(
const T2 &_rep,
const bool _result)> &_cb)
405 std::string fullyQualifiedTopic;
407 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
409 std::cerr <<
"Topic [" << _topic <<
"] is not valid." << std::endl;
413 bool localResponserFound;
416 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
417 localResponserFound = this->Shared()->repliers.FirstHandler(
418 fullyQualifiedTopic, T1().GetTypeName(), T2().GetTypeName(),
423 if (localResponserFound)
428 repHandler->RunLocalCallback(_req, rep, result);
435 std::shared_ptr<ReqHandler<T1, T2>> reqHandlerPtr(
439 reqHandlerPtr->SetMessage(_req);
442 reqHandlerPtr->SetCallback(_cb);
445 std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
448 this->Shared()->requests.AddHandler(
449 fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);
453 if (this->Shared()->discovery->SrvPublishers(
454 fullyQualifiedTopic, addresses))
456 this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
457 T1().GetTypeName(), T2().GetTypeName());
462 if (!this->Shared()->discovery->DiscoverSrv(fullyQualifiedTopic))
464 std::cerr <<
"Node::Request(): Error discovering a service. " 465 <<
"Did you forget to start the discovery service?" 486 public:
template<
typename C,
typename T1,
typename T2>
bool Request(
487 const std::string &_topic,
489 void(C::*_cb)(
const T2 &_rep,
const bool _result),
492 std::function<void(const T2 &, const bool)> f =
493 [_cb, _obj](
const T2 &_internalRep,
const bool _internalResult)
495 auto cb = std::bind(_cb, _obj, std::placeholders::_1,
496 std::placeholders::_2);
497 cb(_internalRep, _internalResult);
500 return this->Request<T1, T2>(_topic, _req, f);
511 public:
template<
typename T1,
typename T2>
bool Request(
512 const std::string &_topic,
514 const unsigned int &_timeout,
518 std::string fullyQualifiedTopic;
520 this->Options().NameSpace(), _topic, fullyQualifiedTopic))
522 std::cerr <<
"Topic [" << _topic <<
"] is not valid." << std::endl;
527 std::shared_ptr<ReqHandler<T1, T2>> reqHandlerPtr(
531 reqHandlerPtr->SetMessage(_req);
533 std::unique_lock<std::recursive_mutex> lk(this->Shared()->mutex);
537 if (this->Shared()->repliers.FirstHandler(fullyQualifiedTopic,
538 T1().GetTypeName(), T2().GetTypeName(), repHandler))
541 repHandler->RunLocalCallback(_req, _rep, _result);
546 this->Shared()->requests.AddHandler(
547 fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);
551 if (this->Shared()->discovery->SrvPublishers(
552 fullyQualifiedTopic, addresses))
554 this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
555 T1().GetTypeName(), T2().GetTypeName());
560 if (!this->Shared()->discovery->DiscoverSrv(fullyQualifiedTopic))
562 std::cerr <<
"Node::Request(): Error discovering a service. " 563 <<
"Did you forget to start the discovery service?" 570 bool executed = reqHandlerPtr->WaitUntil(lk, _timeout);
577 if (!reqHandlerPtr->Result())
584 if (!_rep.ParseFromString(reqHandlerPtr->Response()))
586 std::cerr <<
"Node::Request(): Error Parsing the response" 599 public:
bool UnadvertiseSrv(
const std::string &_topic);
607 public:
void TopicList(std::vector<std::string> &_topics)
const;
613 public:
bool TopicInfo(
const std::string &_topic,
614 std::vector<MessagePublisher> &_publishers)
const;
622 public:
void ServiceList(std::vector<std::string> &_services)
const;
628 public:
bool ServiceInfo(
const std::string &_service,
629 std::vector<ServicePublisher> &_publishers)
const;
633 private:
const std::string &Partition()
const;
637 private:
const std::string &NameSpace()
const;
646 private:
const std::string &NodeUuid()
const;
650 private: std::unordered_set<std::string> &TopicsAdvertised()
const;
654 private: std::unordered_set<std::string> &TopicsSubscribed()
const;
658 private: std::unordered_set<std::string> &SrvsAdvertised()
const;
666 protected: std::unique_ptr<transport::NodePrivate>
dataPtr;
bool Advertise(const std::string &_topic, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new topic.
Definition: Node.hh:79
static bool FullyQualifiedName(const std::string &_partition, const std::string &_ns, const std::string &_topic, std::string &_name)
Get the full topic path given a namespace and a topic name.
#define IGNITION_VISIBLE
Use to represent "symbol visible" if supported.
Definition: Helpers.hh:56
std::map< std::string, std::vector< ServicePublisher > > SrvAddresses_M
Definition: TransportTypes.hh:58
bool Request(const std::string &_topic, const T1 &_req, void(*_cb)(const T2 &_rep, const bool _result))
Request a new service using a non-blocking call.
Definition: Node.hh:376
A class for customizing the behavior of the Node.
Definition: NodeOptions.hh:35
It creates a reply handler for the specific protobuf messages used.
Definition: ReqHandler.hh:175
bool Subscribe(const std::string &_topic, void(*_cb)(const T &_msg))
Subscribe to a topic registering a callback.
Definition: Node.hh:147
ignition/transport/AdvertiseOptions.hh
Definition: AdvertiseOptions.hh:50
bool Advertise(const std::string &_topic, std::function< void(const T1 &_req, T2 &_rep, bool &_result)> &_cb, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:280
Private data for the Node class.
Definition: NodeShared.hh:53
google::protobuf::Message ProtoMsg
Definition: TransportTypes.hh:62
std::shared_ptr< IRepHandler > IRepHandlerPtr
Definition: TransportTypes.hh:82
bool Request(const std::string &_topic, const T1 &_req, std::function< void(const T2 &_rep, const bool _result)> &_cb)
Request a new service using a non-blocking call.
Definition: Node.hh:400
bool Advertise(const std::string &_topic, void(C::*_cb)(const T1 &_req, T2 &_rep, bool &_result), C *_obj, const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:343
bool Subscribe(const std::string &_topic, std::function< void(const T &_msg)> &_cb)
Subscribe to a topic registering a callback.
Definition: Node.hh:165
with the service response.
Definition: RepHandler.hh:102
std::unique_ptr< transport::NodePrivate > dataPtr
Definition: Node.hh:666
A class that allows a client to communicate with other peers.
Definition: Node.hh:64
ignition/transport/Publisher.hh
Definition: Publisher.hh:175
ignition/transport/Publisher.hh
Definition: Publisher.hh:264
bool Advertise(const std::string &_topic, void(*_cb)(const T1 &_req, T2 &_rep, bool &_result), const AdvertiseOptions &_options=AdvertiseOptions())
Advertise a new service.
Definition: Node.hh:254
It creates a subscription handler for a specific protobuf message.
Definition: SubscriptionHandler.hh:102
bool Request(const std::string &_topic, const T1 &_req, const unsigned int &_timeout, T2 &_rep, bool &_result)
Request a new service using a blocking call.
Definition: Node.hh:511
Definition: AdvertiseOptions.hh:25
bool Request(const std::string &_topic, const T1 &_req, void(C::*_cb)(const T2 &_rep, const bool _result), C *_obj)
Request a new service using a non-blocking call.
Definition: Node.hh:486
IGNITION_VISIBLE void waitForShutdown()
Block the current thread until a SIGINT or SIGTERM is received.
bool Subscribe(const std::string &_topic, void(C::*_cb)(const T &_msg), C *_obj)
Subscribe to a topic registering a callback.
Definition: Node.hh:216