00001
00002
00003
00004
00005
00006
00007
00008 #include <botan/pipe.h>
00009 #include <botan/internal/out_buf.h>
00010 #include <botan/secqueue.h>
00011 #include <botan/parsing.h>
00012
00013 namespace Botan {
00014
00015 namespace {
00016
00017
00018
00019
00020 class Null_Filter : public Filter
00021 {
00022 public:
00023 void write(const byte input[], u32bit length)
00024 { send(input, length); }
00025
00026 std::string name() const { return "Null"; }
00027 };
00028
00029 }
00030
00031
00032
00033
00034 Pipe::Pipe(Filter* f1, Filter* f2, Filter* f3, Filter* f4)
00035 {
00036 init();
00037 append(f1);
00038 append(f2);
00039 append(f3);
00040 append(f4);
00041 }
00042
00043
00044
00045
00046 Pipe::Pipe(Filter* filter_array[], u32bit count)
00047 {
00048 init();
00049 for(u32bit j = 0; j != count; ++j)
00050 append(filter_array[j]);
00051 }
00052
00053
00054
00055
00056 Pipe::~Pipe()
00057 {
00058 destruct(pipe);
00059 delete outputs;
00060 }
00061
00062
00063
00064
00065 void Pipe::init()
00066 {
00067 outputs = new Output_Buffers;
00068 pipe = 0;
00069 default_read = 0;
00070 inside_msg = false;
00071 }
00072
00073
00074
00075
00076 void Pipe::reset()
00077 {
00078 if(inside_msg)
00079 throw Invalid_State("Pipe cannot be reset while it is processing");
00080 destruct(pipe);
00081 pipe = 0;
00082 inside_msg = false;
00083 }
00084
00085
00086
00087
00088 void Pipe::destruct(Filter* to_kill)
00089 {
00090 if(!to_kill || dynamic_cast<SecureQueue*>(to_kill))
00091 return;
00092 for(u32bit j = 0; j != to_kill->total_ports(); ++j)
00093 destruct(to_kill->next[j]);
00094 delete to_kill;
00095 }
00096
00097
00098
00099
00100 bool Pipe::end_of_data() const
00101 {
00102 return (remaining() == 0);
00103 }
00104
00105
00106
00107
00108 void Pipe::set_default_msg(message_id msg)
00109 {
00110 if(msg >= message_count())
00111 throw Invalid_Argument("Pipe::set_default_msg: msg number is too high");
00112 default_read = msg;
00113 }
00114
00115
00116
00117
00118 void Pipe::process_msg(const byte input[], u32bit length)
00119 {
00120 start_msg();
00121 write(input, length);
00122 end_msg();
00123 }
00124
00125
00126
00127
00128 void Pipe::process_msg(const MemoryRegion<byte>& input)
00129 {
00130 process_msg(input.begin(), input.size());
00131 }
00132
00133
00134
00135
00136 void Pipe::process_msg(const std::string& input)
00137 {
00138 process_msg(reinterpret_cast<const byte*>(input.data()), input.length());
00139 }
00140
00141
00142
00143
00144 void Pipe::process_msg(DataSource& input)
00145 {
00146 start_msg();
00147 write(input);
00148 end_msg();
00149 }
00150
00151
00152
00153
00154 void Pipe::start_msg()
00155 {
00156 if(inside_msg)
00157 throw Invalid_State("Pipe::start_msg: Message was already started");
00158 if(pipe == 0)
00159 pipe = new Null_Filter;
00160 find_endpoints(pipe);
00161 pipe->new_msg();
00162 inside_msg = true;
00163 }
00164
00165
00166
00167
00168 void Pipe::end_msg()
00169 {
00170 if(!inside_msg)
00171 throw Invalid_State("Pipe::end_msg: Message was already ended");
00172 pipe->finish_msg();
00173 clear_endpoints(pipe);
00174 if(dynamic_cast<Null_Filter*>(pipe))
00175 {
00176 delete pipe;
00177 pipe = 0;
00178 }
00179 inside_msg = false;
00180
00181 outputs->retire();
00182 }
00183
00184
00185
00186
00187 void Pipe::find_endpoints(Filter* f)
00188 {
00189 for(u32bit j = 0; j != f->total_ports(); ++j)
00190 if(f->next[j] && !dynamic_cast<SecureQueue*>(f->next[j]))
00191 find_endpoints(f->next[j]);
00192 else
00193 {
00194 SecureQueue* q = new SecureQueue;
00195 f->next[j] = q;
00196 outputs->add(q);
00197 }
00198 }
00199
00200
00201
00202
00203 void Pipe::clear_endpoints(Filter* f)
00204 {
00205 if(!f) return;
00206 for(u32bit j = 0; j != f->total_ports(); ++j)
00207 {
00208 if(f->next[j] && dynamic_cast<SecureQueue*>(f->next[j]))
00209 f->next[j] = 0;
00210 clear_endpoints(f->next[j]);
00211 }
00212 }
00213
00214
00215
00216
00217 void Pipe::append(Filter* filter)
00218 {
00219 if(inside_msg)
00220 throw Invalid_State("Cannot append to a Pipe while it is processing");
00221 if(!filter)
00222 return;
00223 if(dynamic_cast<SecureQueue*>(filter))
00224 throw Invalid_Argument("Pipe::append: SecureQueue cannot be used");
00225 if(filter->owned)
00226 throw Invalid_Argument("Filters cannot be shared among multiple Pipes");
00227
00228 filter->owned = true;
00229
00230 if(!pipe) pipe = filter;
00231 else pipe->attach(filter);
00232 }
00233
00234
00235
00236
00237 void Pipe::prepend(Filter* filter)
00238 {
00239 if(inside_msg)
00240 throw Invalid_State("Cannot prepend to a Pipe while it is processing");
00241 if(!filter)
00242 return;
00243 if(dynamic_cast<SecureQueue*>(filter))
00244 throw Invalid_Argument("Pipe::prepend: SecureQueue cannot be used");
00245 if(filter->owned)
00246 throw Invalid_Argument("Filters cannot be shared among multiple Pipes");
00247
00248 filter->owned = true;
00249
00250 if(pipe) filter->attach(pipe);
00251 pipe = filter;
00252 }
00253
00254
00255
00256
00257 void Pipe::pop()
00258 {
00259 if(inside_msg)
00260 throw Invalid_State("Cannot pop off a Pipe while it is processing");
00261
00262 if(!pipe)
00263 return;
00264
00265 if(pipe->total_ports() > 1)
00266 throw Invalid_State("Cannot pop off a Filter with multiple ports");
00267
00268 Filter* f = pipe;
00269 u32bit owns = f->owns();
00270 pipe = pipe->next[0];
00271 delete f;
00272
00273 while(owns--)
00274 {
00275 f = pipe;
00276 pipe = pipe->next[0];
00277 delete f;
00278 }
00279 }
00280
00281
00282
00283
00284 Pipe::message_id Pipe::message_count() const
00285 {
00286 return outputs->message_count();
00287 }
00288
00289
00290
00291
00292 const Pipe::message_id Pipe::LAST_MESSAGE =
00293 static_cast<Pipe::message_id>(-2);
00294
00295 const Pipe::message_id Pipe::DEFAULT_MESSAGE =
00296 static_cast<Pipe::message_id>(-1);
00297
00298 }