18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
30 template <
class RequestType,
class ResponseType>
35 const RequestType*, ResponseType*)>
37 : get_reactor_(
std::move(get_reactor)) {}
42 allocator_ = allocator;
48 auto* allocator_state =
static_cast<
53 param.call->call(),
sizeof(ServerCallbackUnaryImpl)))
54 ServerCallbackUnaryImpl(
56 param.server_context),
57 param.call, allocator_state, std::move(param.call_requester));
58 param.server_context->BeginCompletionOp(
59 param.call, [call](
bool) { call->MaybeDone(); }, call);
62 if (param.status.ok()) {
63 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
66 param.server_context),
70 if (reactor ==
nullptr) {
79 call->SetupReactor(reactor);
86 RequestType* request =
nullptr;
88 allocator_state =
nullptr;
89 if (allocator_ !=
nullptr) {
97 *handler_data = allocator_state;
98 request = allocator_state->
request();
112 const RequestType*, ResponseType*)>
115 allocator_ =
nullptr;
121 call_.call(), [
this](
bool) { MaybeDone(); }, &finish_ops_,
122 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
123 finish_ops_.set_core_cq_tag(&finish_tag_);
125 if (!ctx_->sent_initial_metadata_) {
127 ctx_->initial_metadata_flags());
128 if (ctx_->compression_level_set()) {
129 finish_ops_.set_compression_level(ctx_->compression_level());
131 ctx_->sent_initial_metadata_ =
true;
135 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
136 finish_ops_.SendMessagePtr(response()));
138 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
140 finish_ops_.set_core_cq_tag(&finish_tag_);
141 call_.PerformOps(&finish_ops_);
144 void SendInitialMetadata()
override {
147 meta_tag_.Set(call_.call(),
149 reactor_.load(std::memory_order_relaxed)
150 ->OnSendInitialMetadataDone(ok);
154 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
155 ctx_->initial_metadata_flags());
156 if (ctx_->compression_level_set()) {
157 meta_ops_.set_compression_level(ctx_->compression_level());
159 ctx_->sent_initial_metadata_ =
true;
160 meta_ops_.set_core_cq_tag(&meta_tag_);
161 call_.PerformOps(&meta_ops_);
167 ServerCallbackUnaryImpl(
171 std::function<
void()> call_requester)
174 allocator_state_(allocator_state),
175 call_requester_(
std::move(call_requester)) {
176 ctx_->set_message_allocator_state(allocator_state);
184 reactor_.store(reactor, std::memory_order_relaxed);
190 const RequestType* request() {
return allocator_state_->request(); }
191 ResponseType* response() {
return allocator_state_->response(); }
193 void MaybeDone()
override {
195 reactor_.load(std::memory_order_relaxed)->OnDone();
197 auto call_requester = std::move(call_requester_);
198 allocator_state_->Release();
199 this->~ServerCallbackUnaryImpl();
205 ServerReactor* reactor()
override {
206 return reactor_.load(std::memory_order_relaxed);
222 std::function<void()> call_requester_;
233 std::atomic<ServerUnaryReactor*> reactor_;
235 std::atomic<intptr_t> callbacks_outstanding_{
240 template <
class RequestType,
class ResponseType>
247 : get_reactor_(
std::move(get_reactor)) {}
253 param.call->call(),
sizeof(ServerCallbackReaderImpl)))
254 ServerCallbackReaderImpl(
256 param.server_context),
257 param.call, std::move(param.call_requester));
258 param.server_context->BeginCompletionOp(
259 param.call, [reader](
bool) { reader->MaybeDone(); }, reader);
262 if (param.status.ok()) {
267 param.server_context),
271 if (reactor ==
nullptr) {
279 reader->SetupReactor(reactor);
283 std::function<ServerReadReactor<RequestType>*(
290 finish_tag_.Set(call_.call(), [
this](
bool) { MaybeDone(); }, &finish_ops_,
292 if (!ctx_->sent_initial_metadata_) {
294 ctx_->initial_metadata_flags());
295 if (ctx_->compression_level_set()) {
296 finish_ops_.set_compression_level(ctx_->compression_level());
298 ctx_->sent_initial_metadata_ =
true;
302 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
303 finish_ops_.SendMessagePtr(&resp_));
305 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
307 finish_ops_.set_core_cq_tag(&finish_tag_);
308 call_.PerformOps(&finish_ops_);
311 void SendInitialMetadata()
override {
314 meta_tag_.Set(call_.call(),
316 reactor_.load(std::memory_order_relaxed)
317 ->OnSendInitialMetadataDone(ok);
321 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
322 ctx_->initial_metadata_flags());
323 if (ctx_->compression_level_set()) {
324 meta_ops_.set_compression_level(ctx_->compression_level());
326 ctx_->sent_initial_metadata_ =
true;
327 meta_ops_.set_core_cq_tag(&meta_tag_);
328 call_.PerformOps(&meta_ops_);
331 void Read(RequestType* req)
override {
333 read_ops_.RecvMessage(req);
334 call_.PerformOps(&read_ops_);
342 std::function<
void()> call_requester)
343 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
345 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
346 reactor_.store(reactor, std::memory_order_relaxed);
347 read_tag_.Set(call_.call(),
349 reactor_.load(std::memory_order_relaxed)->OnReadDone(ok);
353 read_ops_.set_core_cq_tag(&read_tag_);
359 ~ServerCallbackReaderImpl() {}
361 ResponseType* response() {
return &resp_; }
363 void MaybeDone()
override {
365 reactor_.load(std::memory_order_relaxed)->OnDone();
367 auto call_requester = std::move(call_requester_);
368 this->~ServerCallbackReaderImpl();
374 ServerReactor* reactor()
override {
375 return reactor_.load(std::memory_order_relaxed);
394 std::function<void()> call_requester_;
396 std::atomic<ServerReadReactor<RequestType>*> reactor_;
398 std::atomic<intptr_t> callbacks_outstanding_{
403 template <
class RequestType,
class ResponseType>
410 : get_reactor_(
std::move(get_reactor)) {}
416 param.call->call(),
sizeof(ServerCallbackWriterImpl)))
417 ServerCallbackWriterImpl(
419 param.server_context),
420 param.call,
static_cast<RequestType*
>(param.request),
421 std::move(param.call_requester));
422 param.server_context->BeginCompletionOp(
423 param.call, [writer](
bool) { writer->MaybeDone(); }, writer);
426 if (param.status.ok()) {
431 param.server_context),
434 if (reactor ==
nullptr) {
442 writer->SetupReactor(reactor);
451 call,
sizeof(RequestType))) RequestType();
458 request->~RequestType();
463 std::function<ServerWriteReactor<ResponseType>*(
470 finish_tag_.Set(call_.call(), [
this](
bool) { MaybeDone(); }, &finish_ops_,
472 finish_ops_.set_core_cq_tag(&finish_tag_);
474 if (!ctx_->sent_initial_metadata_) {
476 ctx_->initial_metadata_flags());
477 if (ctx_->compression_level_set()) {
478 finish_ops_.set_compression_level(ctx_->compression_level());
480 ctx_->sent_initial_metadata_ =
true;
482 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
483 call_.PerformOps(&finish_ops_);
486 void SendInitialMetadata()
override {
489 meta_tag_.Set(call_.call(),
491 reactor_.load(std::memory_order_relaxed)
492 ->OnSendInitialMetadataDone(ok);
496 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
497 ctx_->initial_metadata_flags());
498 if (ctx_->compression_level_set()) {
499 meta_ops_.set_compression_level(ctx_->compression_level());
501 ctx_->sent_initial_metadata_ =
true;
502 meta_ops_.set_core_cq_tag(&meta_tag_);
503 call_.PerformOps(&meta_ops_);
506 void Write(
const ResponseType* resp,
512 if (!ctx_->sent_initial_metadata_) {
513 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
514 ctx_->initial_metadata_flags());
515 if (ctx_->compression_level_set()) {
516 write_ops_.set_compression_level(ctx_->compression_level());
518 ctx_->sent_initial_metadata_ =
true;
522 call_.PerformOps(&write_ops_);
533 Finish(std::move(s));
541 const RequestType* req,
542 std::function<
void()> call_requester)
546 call_requester_(
std::move(call_requester)) {}
548 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
549 reactor_.store(reactor, std::memory_order_relaxed);
553 reactor_.load(std::memory_order_relaxed)->OnWriteDone(ok);
557 write_ops_.set_core_cq_tag(&write_tag_);
562 ~ServerCallbackWriterImpl() { req_->~RequestType(); }
564 const RequestType* request() {
return req_; }
566 void MaybeDone()
override {
568 reactor_.load(std::memory_order_relaxed)->OnDone();
570 auto call_requester = std::move(call_requester_);
571 this->~ServerCallbackWriterImpl();
577 ServerReactor* reactor()
override {
578 return reactor_.load(std::memory_order_relaxed);
596 const RequestType* req_;
597 std::function<void()> call_requester_;
599 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
601 std::atomic<intptr_t> callbacks_outstanding_{
606 template <
class RequestType,
class ResponseType>
613 : get_reactor_(
std::move(get_reactor)) {}
618 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
619 ServerCallbackReaderWriterImpl(
621 param.server_context),
622 param.call, std::move(param.call_requester));
623 param.server_context->BeginCompletionOp(
624 param.call, [stream](
bool) { stream->MaybeDone(); }, stream);
627 if (param.status.ok()) {
631 param.server_context));
634 if (reactor ==
nullptr) {
643 stream->SetupReactor(reactor);
647 std::function<ServerBidiReactor<RequestType, ResponseType>*(
651 class ServerCallbackReaderWriterImpl
655 finish_tag_.Set(call_.call(), [
this](
bool) { MaybeDone(); }, &finish_ops_,
657 finish_ops_.set_core_cq_tag(&finish_tag_);
659 if (!ctx_->sent_initial_metadata_) {
661 ctx_->initial_metadata_flags());
662 if (ctx_->compression_level_set()) {
663 finish_ops_.set_compression_level(ctx_->compression_level());
665 ctx_->sent_initial_metadata_ =
true;
667 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
668 call_.PerformOps(&finish_ops_);
671 void SendInitialMetadata()
override {
674 meta_tag_.Set(call_.call(),
676 reactor_.load(std::memory_order_relaxed)
677 ->OnSendInitialMetadataDone(ok);
681 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
682 ctx_->initial_metadata_flags());
683 if (ctx_->compression_level_set()) {
684 meta_ops_.set_compression_level(ctx_->compression_level());
686 ctx_->sent_initial_metadata_ =
true;
687 meta_ops_.set_core_cq_tag(&meta_tag_);
688 call_.PerformOps(&meta_ops_);
691 void Write(
const ResponseType* resp,
697 if (!ctx_->sent_initial_metadata_) {
698 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
699 ctx_->initial_metadata_flags());
700 if (ctx_->compression_level_set()) {
701 write_ops_.set_compression_level(ctx_->compression_level());
703 ctx_->sent_initial_metadata_ =
true;
707 call_.PerformOps(&write_ops_);
717 Finish(std::move(s));
720 void Read(RequestType* req)
override {
722 read_ops_.RecvMessage(req);
723 call_.PerformOps(&read_ops_);
731 std::function<
void()> call_requester)
732 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
734 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
735 reactor_.store(reactor, std::memory_order_relaxed);
739 reactor_.load(std::memory_order_relaxed)->OnWriteDone(ok);
743 write_ops_.set_core_cq_tag(&write_tag_);
744 read_tag_.Set(call_.call(),
746 reactor_.load(std::memory_order_relaxed)->OnReadDone(ok);
750 read_ops_.set_core_cq_tag(&read_tag_);
756 void MaybeDone()
override {
758 reactor_.load(std::memory_order_relaxed)->OnDone();
760 auto call_requester = std::move(call_requester_);
761 this->~ServerCallbackReaderWriterImpl();
767 ServerReactor* reactor()
override {
768 return reactor_.load(std::memory_order_relaxed);
790 std::function<void()> call_requester_;
792 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
794 std::atomic<intptr_t> callbacks_outstanding_{
802 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H