18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
30 template <
class RequestType,
class ResponseType>
35 const RequestType*, ResponseType*)>
37 : get_reactor_(
std::move(get_reactor)) {}
42 allocator_ = allocator;
48 auto* allocator_state =
static_cast<
53 param.call->call(),
sizeof(ServerCallbackUnaryImpl)))
54 ServerCallbackUnaryImpl(
56 param.server_context),
57 param.call, allocator_state, std::move(param.call_requester));
58 param.server_context->BeginCompletionOp(
59 param.call, [call](
bool) { call->MaybeDone(); }, call);
62 if (param.status.ok()) {
63 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
66 param.server_context),
70 if (reactor ==
nullptr) {
79 call->SetupReactor(reactor);
86 RequestType* request =
nullptr;
88 allocator_state =
nullptr;
89 if (allocator_ !=
nullptr) {
97 *handler_data = allocator_state;
98 request = allocator_state->
request();
112 const RequestType*, ResponseType*)>
115 allocator_ =
nullptr;
121 call_.call(), [
this](
bool) { MaybeDone(); }, &finish_ops_,
122 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
123 finish_ops_.set_core_cq_tag(&finish_tag_);
125 if (!ctx_->sent_initial_metadata_) {
127 ctx_->initial_metadata_flags());
128 if (ctx_->compression_level_set()) {
129 finish_ops_.set_compression_level(ctx_->compression_level());
131 ctx_->sent_initial_metadata_ =
true;
135 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
136 finish_ops_.SendMessagePtr(response()));
138 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
140 finish_ops_.set_core_cq_tag(&finish_tag_);
141 call_.PerformOps(&finish_ops_);
144 void SendInitialMetadata()
override {
147 meta_tag_.Set(call_.call(),
149 reactor_.load(std::memory_order_relaxed)
150 ->OnSendInitialMetadataDone(ok);
154 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
155 ctx_->initial_metadata_flags());
156 if (ctx_->compression_level_set()) {
157 meta_ops_.set_compression_level(ctx_->compression_level());
159 ctx_->sent_initial_metadata_ =
true;
160 meta_ops_.set_core_cq_tag(&meta_tag_);
161 call_.PerformOps(&meta_ops_);
167 ServerCallbackUnaryImpl(
171 std::function<
void()> call_requester)
174 allocator_state_(allocator_state),
175 call_requester_(
std::move(call_requester)) {
176 ctx_->set_message_allocator_state(allocator_state);
184 reactor_.store(reactor, std::memory_order_relaxed);
190 const RequestType* request() {
return allocator_state_->request(); }
191 ResponseType* response() {
return allocator_state_->response(); }
193 void MaybeDone()
override {
195 reactor_.load(std::memory_order_relaxed)->OnDone();
197 auto call_requester = std::move(call_requester_);
198 allocator_state_->Release();
199 this->~ServerCallbackUnaryImpl();
205 ServerReactor* reactor()
override {
206 return reactor_.load(std::memory_order_relaxed);
222 std::function<void()> call_requester_;
233 std::atomic<ServerUnaryReactor*> reactor_;
235 std::atomic<intptr_t> callbacks_outstanding_{
240 template <
class RequestType,
class ResponseType>
247 : get_reactor_(
std::move(get_reactor)) {}
253 param.call->call(),
sizeof(ServerCallbackReaderImpl)))
254 ServerCallbackReaderImpl(
256 param.server_context),
257 param.call, std::move(param.call_requester));
258 param.server_context->BeginCompletionOp(
259 param.call, [reader](
bool) { reader->MaybeDone(); }, reader);
262 if (param.status.ok()) {
267 param.server_context),
271 if (reactor ==
nullptr) {
279 reader->SetupReactor(reactor);
283 std::function<ServerReadReactor<RequestType>*(
290 finish_tag_.Set(call_.call(), [
this](
bool) { MaybeDone(); }, &finish_ops_,
292 if (!ctx_->sent_initial_metadata_) {
294 ctx_->initial_metadata_flags());
295 if (ctx_->compression_level_set()) {
296 finish_ops_.set_compression_level(ctx_->compression_level());
298 ctx_->sent_initial_metadata_ =
true;
302 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
303 finish_ops_.SendMessagePtr(&resp_));
305 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
307 finish_ops_.set_core_cq_tag(&finish_tag_);
308 call_.PerformOps(&finish_ops_);
311 void SendInitialMetadata()
override {
314 meta_tag_.Set(call_.call(),
316 reactor_.load(std::memory_order_relaxed)
317 ->OnSendInitialMetadataDone(ok);
321 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
322 ctx_->initial_metadata_flags());
323 if (ctx_->compression_level_set()) {
324 meta_ops_.set_compression_level(ctx_->compression_level());
326 ctx_->sent_initial_metadata_ =
true;
327 meta_ops_.set_core_cq_tag(&meta_tag_);
328 call_.PerformOps(&meta_ops_);
331 void Read(RequestType* req)
override {
333 read_ops_.RecvMessage(req);
334 call_.PerformOps(&read_ops_);
342 std::function<
void()> call_requester)
343 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
345 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
346 reactor_.store(reactor, std::memory_order_relaxed);
347 read_tag_.Set(call_.call(),
349 reactor_.load(std::memory_order_relaxed)->OnReadDone(ok);
353 read_ops_.set_core_cq_tag(&read_tag_);
359 ~ServerCallbackReaderImpl() {}
361 ResponseType* response() {
return &resp_; }
363 void MaybeDone()
override {
365 reactor_.load(std::memory_order_relaxed)->OnDone();
367 auto call_requester = std::move(call_requester_);
368 this->~ServerCallbackReaderImpl();
374 ServerReactor* reactor()
override {
375 return reactor_.load(std::memory_order_relaxed);
394 std::function<void()> call_requester_;
396 std::atomic<ServerReadReactor<RequestType>*> reactor_;
398 std::atomic<intptr_t> callbacks_outstanding_{
403 template <
class RequestType,
class ResponseType>
410 : get_reactor_(
std::move(get_reactor)) {}
416 param.call->call(),
sizeof(ServerCallbackWriterImpl)))
417 ServerCallbackWriterImpl(
419 param.server_context),
420 param.call,
static_cast<RequestType*
>(param.request),
421 std::move(param.call_requester));
422 param.server_context->BeginCompletionOp(
423 param.call, [writer](
bool) { writer->MaybeDone(); }, writer);
426 if (param.status.ok()) {
431 param.server_context),
434 if (reactor ==
nullptr) {
442 writer->SetupReactor(reactor);
451 call,
sizeof(RequestType))) RequestType();
458 request->~RequestType();
463 std::function<ServerWriteReactor<ResponseType>*(
470 finish_tag_.Set(call_.call(), [
this](
bool) { MaybeDone(); }, &finish_ops_,
472 finish_ops_.set_core_cq_tag(&finish_tag_);
474 if (!ctx_->sent_initial_metadata_) {
476 ctx_->initial_metadata_flags());
477 if (ctx_->compression_level_set()) {
478 finish_ops_.set_compression_level(ctx_->compression_level());
480 ctx_->sent_initial_metadata_ =
true;
482 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
483 call_.PerformOps(&finish_ops_);
486 void SendInitialMetadata()
override {
489 meta_tag_.Set(call_.call(),
491 reactor_.load(std::memory_order_relaxed)
492 ->OnSendInitialMetadataDone(ok);
496 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
497 ctx_->initial_metadata_flags());
498 if (ctx_->compression_level_set()) {
499 meta_ops_.set_compression_level(ctx_->compression_level());
501 ctx_->sent_initial_metadata_ =
true;
502 meta_ops_.set_core_cq_tag(&meta_tag_);
503 call_.PerformOps(&meta_ops_);
506 void Write(
const ResponseType* resp,
512 if (!ctx_->sent_initial_metadata_) {
513 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
514 ctx_->initial_metadata_flags());
515 if (ctx_->compression_level_set()) {
516 write_ops_.set_compression_level(ctx_->compression_level());
518 ctx_->sent_initial_metadata_ =
true;
522 call_.PerformOps(&write_ops_);
533 Finish(std::move(s));
541 const RequestType* req,
542 std::function<
void()> call_requester)
546 call_requester_(
std::move(call_requester)) {}
548 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
549 reactor_.store(reactor, std::memory_order_relaxed);
553 reactor_.load(std::memory_order_relaxed)->OnWriteDone(ok);
557 write_ops_.set_core_cq_tag(&write_tag_);
562 ~ServerCallbackWriterImpl() { req_->~RequestType(); }
564 const RequestType* request() {
return req_; }
566 void MaybeDone()
override {
568 reactor_.load(std::memory_order_relaxed)->OnDone();
570 auto call_requester = std::move(call_requester_);
571 this->~ServerCallbackWriterImpl();
577 ServerReactor* reactor()
override {
578 return reactor_.load(std::memory_order_relaxed);
596 const RequestType* req_;
597 std::function<void()> call_requester_;
599 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
601 std::atomic<intptr_t> callbacks_outstanding_{
606 template <
class RequestType,
class ResponseType>
613 : get_reactor_(
std::move(get_reactor)) {}
618 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
619 ServerCallbackReaderWriterImpl(
621 param.server_context),
622 param.call, std::move(param.call_requester));
623 param.server_context->BeginCompletionOp(
624 param.call, [stream](
bool) { stream->MaybeDone(); }, stream);
627 if (param.status.ok()) {
631 param.server_context));
634 if (reactor ==
nullptr) {
643 stream->SetupReactor(reactor);
647 std::function<ServerBidiReactor<RequestType, ResponseType>*(
651 class ServerCallbackReaderWriterImpl
655 finish_tag_.Set(call_.call(), [
this](
bool) { MaybeDone(); }, &finish_ops_,
657 finish_ops_.set_core_cq_tag(&finish_tag_);
659 if (!ctx_->sent_initial_metadata_) {
661 ctx_->initial_metadata_flags());
662 if (ctx_->compression_level_set()) {
663 finish_ops_.set_compression_level(ctx_->compression_level());
665 ctx_->sent_initial_metadata_ =
true;
667 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
668 call_.PerformOps(&finish_ops_);
671 void SendInitialMetadata()
override {
674 meta_tag_.Set(call_.call(),
676 reactor_.load(std::memory_order_relaxed)
677 ->OnSendInitialMetadataDone(ok);
681 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
682 ctx_->initial_metadata_flags());
683 if (ctx_->compression_level_set()) {
684 meta_ops_.set_compression_level(ctx_->compression_level());
686 ctx_->sent_initial_metadata_ =
true;
687 meta_ops_.set_core_cq_tag(&meta_tag_);
688 call_.PerformOps(&meta_ops_);
691 void Write(
const ResponseType* resp,
697 if (!ctx_->sent_initial_metadata_) {
698 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
699 ctx_->initial_metadata_flags());
700 if (ctx_->compression_level_set()) {
701 write_ops_.set_compression_level(ctx_->compression_level());
703 ctx_->sent_initial_metadata_ =
true;
707 call_.PerformOps(&write_ops_);
717 Finish(std::move(s));
720 void Read(RequestType* req)
override {
722 read_ops_.RecvMessage(req);
723 call_.PerformOps(&read_ops_);
731 std::function<
void()> call_requester)
732 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
734 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
735 reactor_.store(reactor, std::memory_order_relaxed);
739 reactor_.load(std::memory_order_relaxed)->OnWriteDone(ok);
743 write_ops_.set_core_cq_tag(&write_tag_);
744 read_tag_.Set(call_.call(),
746 reactor_.load(std::memory_order_relaxed)->OnReadDone(ok);
750 read_ops_.set_core_cq_tag(&read_tag_);
756 void MaybeDone()
override {
758 reactor_.load(std::memory_order_relaxed)->OnDone();
760 auto call_requester = std::move(call_requester_);
761 this->~ServerCallbackReaderWriterImpl();
767 ServerReactor* reactor()
override {
768 return reactor_.load(std::memory_order_relaxed);
790 std::function<void()> call_requester_;
792 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
794 std::atomic<intptr_t> callbacks_outstanding_{
A sequence of bytes.
Definition: byte_buffer.h:67
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:146
virtual void grpc_call_unref(grpc_call *call)=0
virtual void grpc_call_ref(grpc_call *call)=0
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
Did it work? If it didn't, why?
Definition: status.h:31
bool ok() const
Is the status OK?
Definition: status.h:118
Per-message write options.
Definition: call_op_set.h:79
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:186
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:122
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
ResponseT * response()
Definition: message_allocator.h:47
RequestT * request()
Definition: message_allocator.h:46
Straightforward wrapping of the C call object.
Definition: call.h:38
grpc_call * call() const
Definition: call.h:72
Definition: call_op_set.h:422
Definition: call_op_set.h:286
Definition: call_op_set.h:627
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:839
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:137
Base class for running an RPC handler.
Definition: rpc_service_method.h:41
Definition: server_context_impl.h:528
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback_impl.h:223
Definition: server_callback_impl.h:167
void BindReactor(ServerReadReactor< RequestType > *reactor)
Definition: server_callback_impl.h:175
virtual void SendInitialMetadata()=0
Definition: server_callback_impl.h:198
virtual void SendInitialMetadata()=0
void BindReactor(ServerBidiReactor< RequestType, ResponseType > *reactor)
Definition: server_callback_impl.h:210
Definition: server_callback_impl.h:151
virtual void SendInitialMetadata()=0
void BindReactor(Reactor *reactor)
Definition: server_callback_impl.h:161
Definition: server_callback_impl.h:181
void BindReactor(ServerWriteReactor< ResponseType > *reactor)
Definition: server_callback_impl.h:192
virtual void SendInitialMetadata()=0
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback_impl.h:449
Definition: server_callback_impl.h:660
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback_impl.h:537
Definition: server_callback_handlers.h:607
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(::grpc_impl::CallbackServerContext *)> get_reactor)
Definition: server_callback_handlers.h:609
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback_handlers.h:614
Definition: server_callback_handlers.h:241
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(::grpc_impl::CallbackServerContext *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:243
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback_handlers.h:248
Definition: server_callback_handlers.h:404
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback_handlers.h:411
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: server_callback_handlers.h:445
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(::grpc_impl::CallbackServerContext *, const RequestType *)> get_reactor)
Definition: server_callback_handlers.h:406
Definition: server_callback_handlers.h:31
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback_handlers.h:45
CallbackUnaryHandler(std::function< ServerUnaryReactor *(::grpc_impl::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:33
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:82
void SetMessageAllocator(::grpc::experimental::MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_handlers.h:39
Definition: server_callback_impl.h:121
Definition: server_callback_impl.h:727
int Unref()
Decreases the reference count and returns the previous value.
Definition: server_callback_impl.h:102
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback_impl.h:84
void Ref()
Increases the reference count.
Definition: server_callback_impl.h:99
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
::grpc_impl::ServerUnaryReactor ServerUnaryReactor
Definition: server_callback.h:35
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
Definition: callback_common.h:51
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
Definition: server_callback_impl.h:733
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm_impl.h:33
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue_impl.h:90
@ UNIMPLEMENTED
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
Definition: async_unary_call_impl.h:301
Definition: rpc_service_method.h:44
Definition: grpc_types.h:40