GRPC C++  1.26.0
default_health_check_service.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
20 #define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
21 
22 #include <atomic>
23 #include <set>
24 
25 #include <grpc/support/log.h>
26 #include <grpcpp/grpcpp.h>
33 
35 #include "src/core/lib/gprpp/thd.h"
36 
37 namespace grpc {
38 
39 // Default implementation of HealthCheckServiceInterface. Server will create and
40 // own it.
42  public:
44 
45  // The service impl to register with the server.
47  public:
48  // Base class for call handlers.
49  class CallHandler {
50  public:
51  virtual ~CallHandler() = default;
52  virtual void SendHealth(std::shared_ptr<CallHandler> self,
53  ServingStatus status) = 0;
54  };
55 
57  std::unique_ptr<ServerCompletionQueue> cq);
58 
60 
61  void StartServingThread();
62 
63  private:
64  // A tag that can be called with a bool argument. It's tailored for
65  // CallHandler's use. Before being used, it should be constructed with a
66  // method of CallHandler and a shared pointer to the handler. The
67  // shared pointer will be moved to the invoked function and the function
68  // can only be invoked once. That makes ref counting of the handler easier,
69  // because the shared pointer is not bound to the function and can be gone
70  // once the invoked function returns (if not used any more).
71  class CallableTag {
72  public:
73  using HandlerFunction =
74  std::function<void(std::shared_ptr<CallHandler>, bool)>;
75 
76  CallableTag() {}
77 
78  CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
79  : handler_function_(std::move(func)), handler_(std::move(handler)) {
80  GPR_ASSERT(handler_function_ != nullptr);
81  GPR_ASSERT(handler_ != nullptr);
82  }
83 
84  // Runs the tag. This should be called only once. The handler is no
85  // longer owned by this tag after this method is invoked.
86  void Run(bool ok) {
87  GPR_ASSERT(handler_function_ != nullptr);
88  GPR_ASSERT(handler_ != nullptr);
89  handler_function_(std::move(handler_), ok);
90  }
91 
92  // Releases and returns the shared pointer to the handler.
93  std::shared_ptr<CallHandler> ReleaseHandler() {
94  return std::move(handler_);
95  }
96 
97  private:
98  HandlerFunction handler_function_ = nullptr;
99  std::shared_ptr<CallHandler> handler_;
100  };
101 
102  // Call handler for Check method.
103  // Each handler takes care of one call. It contains per-call data and it
104  // will access the members of the parent class (i.e.,
105  // DefaultHealthCheckService) for per-service health data.
106  class CheckCallHandler : public CallHandler {
107  public:
108  // Instantiates a CheckCallHandler and requests the next health check
109  // call. The handler object will manage its own lifetime, so no action is
110  // needed from the caller any more regarding that object.
111  static void CreateAndStart(ServerCompletionQueue* cq,
112  DefaultHealthCheckService* database,
113  HealthCheckServiceImpl* service);
114 
115  // This ctor is public because we want to use std::make_shared<> in
116  // CreateAndStart(). This ctor shouldn't be used elsewhere.
117  CheckCallHandler(ServerCompletionQueue* cq,
118  DefaultHealthCheckService* database,
119  HealthCheckServiceImpl* service);
120 
121  // Not used for Check.
122  void SendHealth(std::shared_ptr<CallHandler> /*self*/,
123  ServingStatus /*status*/) override {}
124 
125  private:
126  // Called when we receive a call.
127  // Spawns a new handler so that we can keep servicing future calls.
128  void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
129 
130  // Called when Finish() is done.
131  void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
132 
133  // The members passed down from HealthCheckServiceImpl.
135  DefaultHealthCheckService* database_;
136  HealthCheckServiceImpl* service_;
137 
138  ByteBuffer request_;
140  ServerContext ctx_;
141 
142  CallableTag next_;
143  };
144 
145  // Call handler for Watch method.
146  // Each handler takes care of one call. It contains per-call data and it
147  // will access the members of the parent class (i.e.,
148  // DefaultHealthCheckService) for per-service health data.
149  class WatchCallHandler : public CallHandler {
150  public:
151  // Instantiates a WatchCallHandler and requests the next health check
152  // call. The handler object will manage its own lifetime, so no action is
153  // needed from the caller any more regarding that object.
154  static void CreateAndStart(ServerCompletionQueue* cq,
155  DefaultHealthCheckService* database,
156  HealthCheckServiceImpl* service);
157 
158  // This ctor is public because we want to use std::make_shared<> in
159  // CreateAndStart(). This ctor shouldn't be used elsewhere.
160  WatchCallHandler(ServerCompletionQueue* cq,
161  DefaultHealthCheckService* database,
162  HealthCheckServiceImpl* service);
163 
164  void SendHealth(std::shared_ptr<CallHandler> self,
165  ServingStatus status) override;
166 
167  private:
168  // Called when we receive a call.
169  // Spawns a new handler so that we can keep servicing future calls.
170  void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
171 
172  // Requires holding send_mu_.
173  void SendHealthLocked(std::shared_ptr<CallHandler> self,
174  ServingStatus status);
175 
176  // When sending a health result finishes.
177  void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
178 
179  void SendFinish(std::shared_ptr<CallHandler> self, const Status& status);
180 
181  // Requires holding service_->cq_shutdown_mu_.
182  void SendFinishLocked(std::shared_ptr<CallHandler> self,
183  const Status& status);
184 
185  // Called when Finish() is done.
186  void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
187 
188  // Called when AsyncNotifyWhenDone() notifies us.
189  void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
190 
191  // The members passed down from HealthCheckServiceImpl.
193  DefaultHealthCheckService* database_;
194  HealthCheckServiceImpl* service_;
195 
196  ByteBuffer request_;
197  grpc::string service_name_;
198  GenericServerAsyncWriter stream_;
199  ServerContext ctx_;
200 
201  grpc_core::Mutex send_mu_;
202  bool send_in_flight_ = false; // Guarded by mu_.
203  ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
204 
205  bool finish_called_ = false;
206  CallableTag next_;
207  CallableTag on_done_notified_;
208  CallableTag on_finish_done_;
209  };
210 
211  // Handles the incoming requests and drives the completion queue in a loop.
212  static void Serve(void* arg);
213 
214  // Returns true on success.
215  static bool DecodeRequest(const ByteBuffer& request,
216  grpc::string* service_name);
217  static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
218 
219  // Needed to appease Windows compilers, which don't seem to allow
220  // nested classes to access protected members in the parent's
221  // superclass.
224 
225  DefaultHealthCheckService* database_;
226  std::unique_ptr<ServerCompletionQueue> cq_;
227 
228  // To synchronize the operations related to shutdown state of cq_, so that
229  // we don't enqueue new tags into cq_ after it is already shut down.
230  grpc_core::Mutex cq_shutdown_mu_;
231  std::atomic_bool shutdown_{false};
232  std::unique_ptr<::grpc_core::Thread> thread_;
233  };
234 
236 
237  void SetServingStatus(const grpc::string& service_name,
238  bool serving) override;
239  void SetServingStatus(bool serving) override;
240 
241  void Shutdown() override;
242 
243  ServingStatus GetServingStatus(const grpc::string& service_name) const;
244 
245  HealthCheckServiceImpl* GetHealthCheckService(
246  std::unique_ptr<ServerCompletionQueue> cq);
247 
248  private:
249  // Stores the current serving status of a service and any call
250  // handlers registered for updates when the service's status changes.
251  class ServiceData {
252  public:
253  void SetServingStatus(ServingStatus status);
254  ServingStatus GetServingStatus() const { return status_; }
255  void AddCallHandler(
256  std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
257  void RemoveCallHandler(
258  const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
259  bool Unused() const {
260  return call_handlers_.empty() && status_ == NOT_FOUND;
261  }
262 
263  private:
264  ServingStatus status_ = NOT_FOUND;
265  std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
266  call_handlers_;
267  };
268 
269  void RegisterCallHandler(
270  const grpc::string& service_name,
271  std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
272 
273  void UnregisterCallHandler(
274  const grpc::string& service_name,
275  const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
276 
277  mutable grpc_core::Mutex mu_;
278  bool shutdown_ = false; // Guarded by mu_.
279  std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_.
280  std::unique_ptr<HealthCheckServiceImpl> impl_;
281 };
282 
283 } // namespace grpc
284 
285 #endif // GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
virtual void SendHealth(std::shared_ptr< CallHandler > self, ServingStatus status)=0
Definition: default_health_check_service.h:46
void StartServingThread()
Definition: default_health_check_service.cc:174
~HealthCheckServiceImpl()
Definition: default_health_check_service.cc:164
HealthCheckServiceImpl(DefaultHealthCheckService *database, std::unique_ptr< ServerCompletionQueue > cq)
Definition: default_health_check_service.cc:149
Definition: default_health_check_service.h:41
DefaultHealthCheckService()
Definition: default_health_check_service.cc:37
ServingStatus GetServingStatus(const grpc::string &service_name) const
Definition: default_health_check_service.cc:76
void SetServingStatus(const grpc::string &service_name, bool serving) override
Set or change the serving status of the given service_name.
Definition: default_health_check_service.cc:41
HealthCheckServiceImpl * GetHealthCheckService(std::unique_ptr< ServerCompletionQueue > cq)
Definition: default_health_check_service.cc:111
void Shutdown() override
Set all registered service names to not serving and prevent future state changes.
Definition: default_health_check_service.cc:63
ServingStatus
Definition: default_health_check_service.h:43
@ SERVING
Definition: default_health_check_service.h:43
@ NOT_SERVING
Definition: default_health_check_service.h:43
@ NOT_FOUND
Definition: default_health_check_service.h:43
Desriptor of an RPC service and its various RPC methods.
Definition: service_type.h:60
void RequestAsyncServerStreaming(int index, ::grpc_impl::ServerContext *context, Message *request, internal::ServerAsyncStreamingInterface *stream, ::grpc_impl::CompletionQueue *call_cq, ::grpc_impl::ServerCompletionQueue *notification_cq, void *tag)
Definition: service_type.h:172
void RequestAsyncUnary(int index, ::grpc_impl::ServerContext *context, Message *request, internal::ServerAsyncStreamingInterface *stream, ::grpc_impl::CompletionQueue *call_cq, ::grpc_impl::ServerCompletionQueue *notification_cq, void *tag)
Definition: service_type.h:149
Definition: sync.h:40
The gRPC server uses this interface to expose the health checking service without depending on protob...
Definition: health_check_service_interface_impl.h:28
#define GPR_ASSERT(x)
abort() the process if x is zero, having written a line to the log.
Definition: log.h:94
::google::protobuf::util::Status Status
Definition: config_protobuf.h:90
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
::grpc_impl::ServerCompletionQueue ServerCompletionQueue
Definition: completion_queue.h:27
::grpc_impl::ServerContext ServerContext
Definition: server_context.h:26
::grpc_impl::ServerAsyncResponseWriter< ByteBuffer > GenericServerAsyncResponseWriter
Definition: async_generic_service.h:34
std::string string
Definition: config.h:35
::grpc_impl::ServerAsyncWriter< ByteBuffer > GenericServerAsyncWriter
Definition: async_generic_service.h:37
Definition: async_unary_call_impl.h:301