GRPC Core  9.0.0
subchannel_list.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
21 
23 
24 #include <string.h>
25 
26 #include <grpc/support/alloc.h>
27 
30 // TODO(roth): Should not need the include of subchannel.h here, since
31 // that implementation should be hidden from the LB policy API.
43 
44 // Code for maintaining a list of subchannels within an LB policy.
45 //
46 // To use this, callers must create their own subclasses, like so:
47 /*
48 
49 class MySubchannelList; // Forward declaration.
50 
51 class MySubchannelData
52  : public SubchannelData<MySubchannelList, MySubchannelData> {
53  public:
54  void ProcessConnectivityChangeLocked(
55  grpc_connectivity_state connectivity_state) override {
56  // ...code to handle connectivity changes...
57  }
58 };
59 
60 class MySubchannelList
61  : public SubchannelList<MySubchannelList, MySubchannelData> {
62 };
63 
64 */
65 // All methods will be called from within the client_channel combiner.
66 
67 namespace grpc_core {
68 
69 // Forward declaration.
70 template <typename SubchannelListType, typename SubchannelDataType>
71 class SubchannelList;
72 
73 // Stores data for a particular subchannel in a subchannel list.
74 // Callers must create a subclass that implements the
75 // ProcessConnectivityChangeLocked() method.
76 template <typename SubchannelListType, typename SubchannelDataType>
78  public:
79  // Returns a pointer to the subchannel list containing this object.
80  SubchannelListType* subchannel_list() const {
81  return static_cast<SubchannelListType*>(subchannel_list_);
82  }
83 
84  // Returns the index into the subchannel list of this object.
85  size_t Index() const {
86  return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
87  subchannel_list_->subchannel(0));
88  }
89 
90  // Returns a pointer to the subchannel.
91  SubchannelInterface* subchannel() const { return subchannel_.get(); }
92 
93  // Synchronously checks the subchannel's connectivity state.
94  // Must not be called while there is a connectivity notification
95  // pending (i.e., between calling StartConnectivityWatchLocked() and
96  // calling CancelConnectivityWatchLocked()).
98  GPR_ASSERT(pending_watcher_ == nullptr);
99  connectivity_state_ = subchannel_->CheckConnectivityState();
100  return connectivity_state_;
101  }
102 
103  // Resets the connection backoff.
104  // TODO(roth): This method should go away when we move the backoff
105  // code out of the subchannel and into the LB policies.
106  void ResetBackoffLocked();
107 
108  // Starts watching the connectivity state of the subchannel.
109  // ProcessConnectivityChangeLocked() will be called whenever the
110  // connectivity state changes.
112 
113  // Cancels watching the connectivity state of the subchannel.
114  void CancelConnectivityWatchLocked(const char* reason);
115 
116  // Cancels any pending connectivity watch and unrefs the subchannel.
117  void ShutdownLocked();
118 
119  protected:
122  const ServerAddress& address,
124 
125  virtual ~SubchannelData();
126 
127  // After StartConnectivityWatchLocked() is called, this method will be
128  // invoked whenever the subchannel's connectivity state changes.
129  // To stop watching, use CancelConnectivityWatchLocked().
131  grpc_connectivity_state connectivity_state) = 0;
132 
133  private:
134  // Watcher for subchannel connectivity state.
135  class Watcher
137  public:
138  Watcher(
141  : subchannel_data_(subchannel_data),
142  subchannel_list_(std::move(subchannel_list)) {}
143 
144  ~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); }
145 
146  void OnConnectivityStateChange(grpc_connectivity_state new_state) override;
147 
148  grpc_pollset_set* interested_parties() override {
149  return subchannel_list_->policy()->interested_parties();
150  }
151 
152  private:
154  RefCountedPtr<SubchannelListType> subchannel_list_;
155  };
156 
157  // Unrefs the subchannel.
158  void UnrefSubchannelLocked(const char* reason);
159 
160  // Backpointer to owning subchannel list. Not owned.
162  // The subchannel.
164  // Will be non-null when the subchannel's state is being watched.
166  nullptr;
167  // Data updated by the watcher.
168  grpc_connectivity_state connectivity_state_;
169 };
170 
171 // A list of subchannels.
172 template <typename SubchannelListType, typename SubchannelDataType>
173 class SubchannelList : public InternallyRefCounted<SubchannelListType> {
174  public:
176 
177  // The number of subchannels in the list.
178  size_t num_subchannels() const { return subchannels_.size(); }
179 
180  // The data for the subchannel at a particular index.
181  SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
182 
183  // Returns true if the subchannel list is shutting down.
184  bool shutting_down() const { return shutting_down_; }
185 
186  // Accessors.
187  LoadBalancingPolicy* policy() const { return policy_; }
188  TraceFlag* tracer() const { return tracer_; }
189 
190  // Resets connection backoff of all subchannels.
191  // TODO(roth): We will probably need to rethink this as part of moving
192  // the backoff code out of subchannels and into LB policies.
193  void ResetBackoffLocked();
194 
195  void Orphan() override {
196  ShutdownLocked();
198  }
199 
200  protected:
202  const ServerAddressList& addresses,
204  const grpc_channel_args& args);
205 
206  virtual ~SubchannelList();
207 
208  private:
209  // For accessing Ref() and Unref().
210  friend class SubchannelData<SubchannelListType, SubchannelDataType>;
211 
212  void ShutdownLocked();
213 
214  // Backpointer to owning policy.
215  LoadBalancingPolicy* policy_;
216 
217  TraceFlag* tracer_;
218 
219  // The list of subchannels.
220  SubchannelVector subchannels_;
221 
222  // Is this list shutting down? This may be true due to the shutdown of the
223  // policy itself or because a newer update has arrived while this one hadn't
224  // finished processing.
225  bool shutting_down_ = false;
226 };
227 
228 //
229 // implementation -- no user-servicable parts below
230 //
231 
232 //
233 // SubchannelData::Watcher
234 //
235 
236 template <typename SubchannelListType, typename SubchannelDataType>
239  if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
241  "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
242  " (subchannel %p): connectivity changed: state=%s, "
243  "shutting_down=%d, pending_watcher=%p",
244  subchannel_list_->tracer()->name(), subchannel_list_->policy(),
245  subchannel_list_.get(), subchannel_data_->Index(),
246  subchannel_list_->num_subchannels(),
247  subchannel_data_->subchannel_.get(),
248  ConnectivityStateName(new_state), subchannel_list_->shutting_down(),
249  subchannel_data_->pending_watcher_);
250  }
251  if (!subchannel_list_->shutting_down() &&
252  subchannel_data_->pending_watcher_ != nullptr) {
253  subchannel_data_->connectivity_state_ = new_state;
254  // Call the subclass's ProcessConnectivityChangeLocked() method.
255  subchannel_data_->ProcessConnectivityChangeLocked(new_state);
256  }
257 }
258 
259 //
260 // SubchannelData
261 //
262 
263 template <typename SubchannelListType, typename SubchannelDataType>
266  const ServerAddress& /*address*/,
268  : subchannel_list_(subchannel_list),
269  subchannel_(std::move(subchannel)),
270  // We assume that the current state is IDLE. If not, we'll get a
271  // callback telling us that.
272  connectivity_state_(GRPC_CHANNEL_IDLE) {}
273 
274 template <typename SubchannelListType, typename SubchannelDataType>
276  GPR_ASSERT(subchannel_ == nullptr);
277 }
278 
279 template <typename SubchannelListType, typename SubchannelDataType>
281  UnrefSubchannelLocked(const char* reason) {
282  if (subchannel_ != nullptr) {
283  if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
285  "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
286  " (subchannel %p): unreffing subchannel (%s)",
287  subchannel_list_->tracer()->name(), subchannel_list_->policy(),
288  subchannel_list_, Index(), subchannel_list_->num_subchannels(),
289  subchannel_.get(), reason);
290  }
291  subchannel_.reset();
292  }
293 }
294 
295 template <typename SubchannelListType, typename SubchannelDataType>
296 void SubchannelData<SubchannelListType,
297  SubchannelDataType>::ResetBackoffLocked() {
298  if (subchannel_ != nullptr) {
299  subchannel_->ResetBackoff();
300  }
301 }
302 
303 template <typename SubchannelListType, typename SubchannelDataType>
304 void SubchannelData<SubchannelListType,
305  SubchannelDataType>::StartConnectivityWatchLocked() {
306  if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
308  "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
309  " (subchannel %p): starting watch (from %s)",
310  subchannel_list_->tracer()->name(), subchannel_list_->policy(),
311  subchannel_list_, Index(), subchannel_list_->num_subchannels(),
312  subchannel_.get(), ConnectivityStateName(connectivity_state_));
313  }
314  GPR_ASSERT(pending_watcher_ == nullptr);
315  pending_watcher_ =
316  new Watcher(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
317  subchannel_->WatchConnectivityState(
318  connectivity_state_,
319  std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>(
320  pending_watcher_));
321 }
322 
323 template <typename SubchannelListType, typename SubchannelDataType>
325  CancelConnectivityWatchLocked(const char* reason) {
326  if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
328  "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
329  " (subchannel %p): canceling connectivity watch (%s)",
330  subchannel_list_->tracer()->name(), subchannel_list_->policy(),
331  subchannel_list_, Index(), subchannel_list_->num_subchannels(),
332  subchannel_.get(), reason);
333  }
334  if (pending_watcher_ != nullptr) {
335  subchannel_->CancelConnectivityStateWatch(pending_watcher_);
336  pending_watcher_ = nullptr;
337  }
338 }
339 
340 template <typename SubchannelListType, typename SubchannelDataType>
342  if (pending_watcher_ != nullptr) CancelConnectivityWatchLocked("shutdown");
343  UnrefSubchannelLocked("shutdown");
344 }
345 
346 //
347 // SubchannelList
348 //
349 
350 template <typename SubchannelListType, typename SubchannelDataType>
352  LoadBalancingPolicy* policy, TraceFlag* tracer,
353  const ServerAddressList& addresses,
355  const grpc_channel_args& args)
356  : InternallyRefCounted<SubchannelListType>(tracer),
357  policy_(policy),
358  tracer_(tracer) {
359  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
361  "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
362  tracer_->name(), policy, this, addresses.size());
363  }
364  subchannels_.reserve(addresses.size());
365  // We need to remove the LB addresses in order to be able to compare the
366  // subchannel keys of subchannels from a different batch of addresses.
367  // We remove the service config, since it will be passed into the
368  // subchannel via call context.
369  static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
371  // Create a subchannel for each address.
372  for (size_t i = 0; i < addresses.size(); i++) {
373  // TODO(roth): we should ideally hide this from the LB policy code. In
374  // principle, if we're dealing with this special case in the client_channel
375  // code for selecting grpclb, then we should also strip out these addresses
376  // there if we're not using grpclb.
377  if (addresses[i].IsBalancer()) {
378  continue;
379  }
380  InlinedVector<grpc_arg, 3> args_to_add;
381  const size_t subchannel_address_arg_index = args_to_add.size();
382  args_to_add.emplace_back(
383  Subchannel::CreateSubchannelAddressArg(&addresses[i].address()));
384  if (addresses[i].args() != nullptr) {
385  for (size_t j = 0; j < addresses[i].args()->num_args; ++j) {
386  args_to_add.emplace_back(addresses[i].args()->args[j]);
387  }
388  }
390  &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove),
391  args_to_add.data(), args_to_add.size());
392  gpr_free(args_to_add[subchannel_address_arg_index].value.string);
394  helper->CreateSubchannel(*new_args);
395  grpc_channel_args_destroy(new_args);
396  if (subchannel == nullptr) {
397  // Subchannel could not be created.
398  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
399  char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address());
401  "[%s %p] could not create subchannel for address uri %s, "
402  "ignoring",
403  tracer_->name(), policy_, address_uri);
404  gpr_free(address_uri);
405  }
406  continue;
407  }
408  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
409  char* address_uri = grpc_sockaddr_to_uri(&addresses[i].address());
411  "[%s %p] subchannel list %p index %" PRIuPTR
412  ": Created subchannel %p for address uri %s",
413  tracer_->name(), policy_, this, subchannels_.size(),
414  subchannel.get(), address_uri);
415  gpr_free(address_uri);
416  }
417  subchannels_.emplace_back(this, addresses[i], std::move(subchannel));
418  }
419 }
420 
421 template <typename SubchannelListType, typename SubchannelDataType>
423  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
424  gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(),
425  policy_, this);
426  }
427 }
428 
429 template <typename SubchannelListType, typename SubchannelDataType>
431  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
432  gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p",
433  tracer_->name(), policy_, this);
434  }
435  GPR_ASSERT(!shutting_down_);
436  shutting_down_ = true;
437  for (size_t i = 0; i < subchannels_.size(); i++) {
438  SubchannelDataType* sd = &subchannels_[i];
439  sd->ShutdownLocked();
440  }
441 }
442 
443 template <typename SubchannelListType, typename SubchannelDataType>
444 void SubchannelList<SubchannelListType,
445  SubchannelDataType>::ResetBackoffLocked() {
446  for (size_t i = 0; i < subchannels_.size(); i++) {
447  SubchannelDataType* sd = &subchannels_[i];
448  sd->ResetBackoffLocked();
449  }
450 }
451 
452 } // namespace grpc_core
453 
454 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */
void grpc_channel_args_destroy(grpc_channel_args *a)
Destroy arguments created by grpc_channel_args_copy.
Definition: channel_args.cc:197
grpc_channel_args * grpc_channel_args_copy_and_add_and_remove(const grpc_channel_args *src, const char **to_remove, size_t num_to_remove, const grpc_arg *to_add, size_t num_to_add)
Copies the arguments from src except for those whose keys are in to_remove and appends the arguments ...
Definition: channel_args.cc:77
T * data()
Definition: inlined_vector.h:93
void reserve(size_t capacity)
Definition: inlined_vector.h:125
void emplace_back(Args &&... args)
Definition: inlined_vector.h:145
size_t size() const
Definition: inlined_vector.h:165
Definition: orphanable.h:77
void Unref()
Definition: orphanable.h:107
A proxy object implemented by the client channel and used by the LB policy to communicate with the ch...
Definition: lb_policy.h:260
virtual RefCountedPtr< SubchannelInterface > CreateSubchannel(const grpc_channel_args &args)=0
Creates a new subchannel with the specified channel args.
Interface for load balancing policies.
Definition: lb_policy.h:81
Definition: ref_counted_ptr.h:35
Definition: server_address.h:44
Definition: subchannel_list.h:77
SubchannelInterface * subchannel() const
Definition: subchannel_list.h:91
virtual ~SubchannelData()
Definition: subchannel_list.h:275
virtual void ProcessConnectivityChangeLocked(grpc_connectivity_state connectivity_state)=0
void ShutdownLocked()
Definition: subchannel_list.h:341
SubchannelListType * subchannel_list() const
Definition: subchannel_list.h:80
size_t Index() const
Definition: subchannel_list.h:85
SubchannelData(SubchannelList< SubchannelListType, SubchannelDataType > *subchannel_list, const ServerAddress &address, RefCountedPtr< SubchannelInterface > subchannel)
Definition: subchannel_list.h:264
grpc_connectivity_state CheckConnectivityStateLocked()
Definition: subchannel_list.h:97
void ResetBackoffLocked()
Definition: subchannel_list.h:297
void CancelConnectivityWatchLocked(const char *reason)
Definition: subchannel_list.h:325
void StartConnectivityWatchLocked()
Definition: subchannel_list.h:305
void CancelConnectivityStateWatch(const char *health_check_service_name, ConnectivityStateWatcherInterface *watcher)
Definition: subchannel.cc:809
static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address *addr)
Definition: subchannel.cc:841
void ResetBackoff()
Definition: subchannel.cc:829
void WatchConnectivityState(grpc_connectivity_state initial_state, grpc_core::UniquePtr< char > health_check_service_name, OrphanablePtr< ConnectivityStateWatcherInterface > watcher)
Definition: subchannel.cc:788
Definition: subchannel_interface.h:30
Definition: subchannel_list.h:173
bool shutting_down() const
Definition: subchannel_list.h:184
void ResetBackoffLocked()
Definition: subchannel_list.h:445
SubchannelDataType * subchannel(size_t index)
Definition: subchannel_list.h:181
TraceFlag * tracer() const
Definition: subchannel_list.h:188
LoadBalancingPolicy * policy() const
Definition: subchannel_list.h:187
virtual ~SubchannelList()
Definition: subchannel_list.h:422
SubchannelList(LoadBalancingPolicy *policy, TraceFlag *tracer, const ServerAddressList &addresses, LoadBalancingPolicy::ChannelControlHelper *helper, const grpc_channel_args &args)
Definition: subchannel_list.h:351
size_t num_subchannels() const
Definition: subchannel_list.h:178
void Orphan() override
Definition: subchannel_list.h:195
InlinedVector< SubchannelDataType, 10 > SubchannelVector
Definition: subchannel_list.h:175
Definition: trace.h:61
const char * name() const
Definition: trace.h:68
#define DEBUG_LOCATION
Definition: debug_location.h:41
#define GRPC_ARG_SERVICE_CONFIG
Service config data in JSON form.
Definition: grpc_types.h:291
#define GPR_ASSERT(x)
abort() the process if x is zero, having written a line to the log.
Definition: log.h:94
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
Log a message.
#define GPR_INFO
Definition: log.h:56
grpc_connectivity_state
Connectivity state of a channel.
Definition: connectivity_state.h:27
@ GRPC_CHANNEL_IDLE
channel is idle
Definition: connectivity_state.h:29
GPRAPI void gpr_free(void *ptr)
free
Definition: alloc.cc:50
Round Robin Policy.
Definition: backend_metric.cc:24
const char * ConnectivityStateName(grpc_connectivity_state state)
Definition: connectivity_state.cc:36
struct grpc_pollset_set grpc_pollset_set
Definition: pollset_set.h:31
char * grpc_sockaddr_to_uri(const grpc_resolved_address *resolved_addr)
Definition: sockaddr_utils.cc:219
An array of arguments that can be passed around.
Definition: grpc_types.h:132
#define GRPC_ARG_SUBCHANNEL_ADDRESS
Definition: subchannel.h:41
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: trace.h:112
#define GPR_ARRAY_SIZE(array)
Definition: useful.h:31