GRPC Core  9.0.0
byte_stream.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H
20 #define GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H
21 
23 
24 #include <grpc/slice_buffer.h>
27 
30 #define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
32 #define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
33 
34 namespace grpc_core {
35 
36 class ByteStream : public Orphanable {
37  public:
38  virtual ~ByteStream() {}
39 
40  // Returns true if the bytes are available immediately (in which case
41  // on_complete will not be called), or false if the bytes will be available
42  // asynchronously (in which case on_complete will be called when they
43  // are available). Should not be called if there is no data left on the
44  // stream.
45  //
46  // max_size_hint can be set as a hint as to the maximum number
47  // of bytes that would be acceptable to read.
48  virtual bool Next(size_t max_size_hint, grpc_closure* on_complete) = 0;
49 
50  // Returns the next slice in the byte stream when it is available, as
51  // indicated by Next().
52  //
53  // Once a slice is returned into *slice, it is owned by the caller.
54  virtual grpc_error* Pull(grpc_slice* slice) = 0;
55 
56  // Shuts down the byte stream.
57  //
58  // If there is a pending call to on_complete from Next(), it will be
59  // invoked with the error passed to Shutdown().
60  //
61  // The next call to Pull() (if any) will return the error passed to
62  // Shutdown().
63  virtual void Shutdown(grpc_error* error) = 0;
64 
65  uint32_t length() const { return length_; }
66  uint32_t flags() const { return flags_; }
67 
68  void set_flags(uint32_t flags) { flags_ = flags; }
69 
70  protected:
71  ByteStream(uint32_t length, uint32_t flags)
72  : length_(length), flags_(flags) {}
73 
74  private:
75  const uint32_t length_;
76  uint32_t flags_;
77 };
78 
79 //
80 // SliceBufferByteStream
81 //
82 // A ByteStream that wraps a slice buffer.
83 //
84 
86  public:
87  // Removes all slices in slice_buffer, leaving it empty.
88  SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags);
89 
91 
92  void Orphan() override;
93 
94  bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
95  grpc_error* Pull(grpc_slice* slice) override;
96  void Shutdown(grpc_error* error) override;
97 
98  private:
99  grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
100  grpc_slice_buffer backing_buffer_;
101 };
102 
103 //
104 // CachingByteStream
105 //
106 // A ByteStream that that wraps an underlying byte stream but caches
107 // the resulting slices in a slice buffer. If an initial attempt fails
108 // without fully draining the underlying stream, a new caching stream
109 // can be created from the same underlying cache, in which case it will
110 // return whatever is in the backing buffer before continuing to read the
111 // underlying stream.
112 //
113 // NOTE: No synchronization is done, so it is not safe to have multiple
114 // CachingByteStreams simultaneously drawing from the same underlying
115 // ByteStreamCache at the same time.
116 //
117 
119  public:
120  class CachingByteStream : public ByteStream {
121  public:
122  explicit CachingByteStream(ByteStreamCache* cache);
123 
125 
126  void Orphan() override;
127 
128  bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
129  grpc_error* Pull(grpc_slice* slice) override;
130  void Shutdown(grpc_error* error) override;
131 
132  // Resets the byte stream to the start of the underlying stream.
133  void Reset();
134 
135  private:
136  ByteStreamCache* cache_;
137  size_t cursor_ = 0;
138  size_t offset_ = 0;
139  grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
140  };
141 
142  explicit ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream);
143 
145 
146  // Must not be destroyed while still in use by a CachingByteStream.
147  void Destroy();
148 
149  grpc_slice_buffer* cache_buffer() { return &cache_buffer_; }
150 
151  private:
152  OrphanablePtr<ByteStream> underlying_stream_;
153  uint32_t length_;
154  uint32_t flags_;
155  grpc_slice_buffer cache_buffer_;
156 };
157 
158 } // namespace grpc_core
159 
160 #endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */
void Shutdown(grpc_error *error) override
Definition: byte_stream.cc:145
~CachingByteStream()
Definition: byte_stream.cc:102
grpc_error * Pull(grpc_slice *slice) override
Definition: byte_stream.cc:120
bool Next(size_t max_size_hint, grpc_closure *on_complete) override
Definition: byte_stream.cc:112
void Reset()
Definition: byte_stream.cc:153
void Orphan() override
Definition: byte_stream.cc:104
CachingByteStream(ByteStreamCache *cache)
Definition: byte_stream.cc:99
Definition: byte_stream.h:118
ByteStreamCache(OrphanablePtr< ByteStream > underlying_stream)
Definition: byte_stream.cc:79
~ByteStreamCache()
Definition: byte_stream.cc:86
grpc_slice_buffer * cache_buffer()
Definition: byte_stream.h:149
void Destroy()
Definition: byte_stream.cc:88
Definition: byte_stream.h:36
virtual grpc_error * Pull(grpc_slice *slice)=0
ByteStream(uint32_t length, uint32_t flags)
Definition: byte_stream.h:71
virtual bool Next(size_t max_size_hint, grpc_closure *on_complete)=0
uint32_t flags() const
Definition: byte_stream.h:66
virtual ~ByteStream()
Definition: byte_stream.h:38
void set_flags(uint32_t flags)
Definition: byte_stream.h:68
virtual void Shutdown(grpc_error *error)=0
uint32_t length() const
Definition: byte_stream.h:65
Definition: orphanable.h:43
Definition: byte_stream.h:85
~SliceBufferByteStream()
Definition: byte_stream.cc:45
SliceBufferByteStream(grpc_slice_buffer *slice_buffer, uint32_t flags)
Definition: byte_stream.cc:37
void Shutdown(grpc_error *error) override
Definition: byte_stream.cc:70
grpc_error * Pull(grpc_slice *slice) override
Definition: byte_stream.cc:62
bool Next(size_t max_size_hint, grpc_closure *on_complete) override
Definition: byte_stream.cc:56
void Orphan() override
Definition: byte_stream.cc:47
grpc_closure on_complete
Definition: client_channel.cc:473
#define GRPC_ERROR_NONE
The following "special" errors can be propagated without allocating memory.
Definition: error.h:125
Round Robin Policy.
Definition: backend_metric.cc:24
std::unique_ptr< T, Deleter > OrphanablePtr
Definition: orphanable.h:68
A closure over a grpc_iomgr_cb_func.
Definition: closure.h:56
Definition: error_internal.h:39
Represents an expandable array of slices, to be interpreted as a single item.
Definition: slice.h:78
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1].
Definition: slice.h:60