public class DistPipeFilter
extends java.lang.Object
is a communication endpoint for a distributed pipe of
(text) filters. Given there are several filters set up with a
FilterServiceFactory
running on the network, this class
allows to contact them and run data through them in a given
sequence.
Implementation Note: Rather then
contacting each server, one after another, the last server in the
sequence is contacted and instructed to fetch the data from the
second to last and so on. Thereby an arbitrary large sequence of
filters is set up. Finally, the first filter in the sequence
contacts back to the initiating DistPipeFilter
to read
the input.
Objects of this class can be used by several threads in
parallel. The maximum number that will run in parallel is defined
by the slot
parameter of the constructor. If this
number is exceeded, threads are blocked until running threads
finish.
The following steps are necessary to use objects of this class:
start()
(necessary in only one thread) to start the
internal server,open()
and close()
to
filter data,shutdown()
to stop the internal
server.Filter servers to contact can be easily set up by running a
TcpServer
with a FilterServiceFactory
.
Constructor and Description |
---|
DistPipeFilter(int port,
int slots) |
DistPipeFilter(int port,
int slots,
java.io.PrintStream logging)
creates a
DistPipeFilter on the given
port. |
Modifier and Type | Method and Description |
---|---|
void |
close(java.io.InputStream is)
must be called with an
InputStream as
returned by open() eventually to release the
connection resources used to contact the pipeline. |
java.io.InputStream |
open(PipelineRequest[] request,
Feeder in)
sets up the pipeline as determined by
request to be
fed by the given Feeder . |
void |
shutdown()
shuts down the
TcpServer running on our behalf. |
void |
start()
start serving.
|
public DistPipeFilter(int port, int slots, java.io.PrintStream logging) throws java.io.IOException
creates a DistPipeFilter
on the given
port. Filters on the network will eventually contact back on this
port to read their input data. Don't forget to call start()
before calling one of the filter functions.
If DistPipeFilter
is no longer needed, it should be
shut down
.
port
- may be 0 to ask ServerSocket
for an
arbitrary port.logging
- may be null to indicate that no logging is
required.java.io.IOException
public DistPipeFilter(int port, int slots) throws java.io.IOException
java.io.IOException
public void start()
start serving. Only after starting the internal server, open()
may be called. The internal server is run in a
daemon thread, i.e. it is eventually automatically terminated if
the JVM exits.
Calling this method when the server is already running has no effect.
public void shutdown()
shuts down the TcpServer
running on our behalf. This
does not influence any filter operations already
running. However, active calls to one of the filter functions
waiting for a slot to become free will not succeed anymore.
public java.io.InputStream open(PipelineRequest[] request, Feeder in) throws java.io.IOException
sets up the pipeline as determined by request
to be
fed by the given Feeder
. The filtered data can be
read from the InputStream
returned. This
InputStream
must eventually be passed to
close()
to properly terminate the connection to
the pipeline.
Hint: A Feeder
can be easily implemented by
subclassing AbstractPipe
.
java.io.IOException
public void close(java.io.InputStream is) throws java.io.IOException
must be called with an InputStream
as
returned by open()
eventually to release the
connection resources used to contact the pipeline.
java.lang.IllegalArgumentException
- if is
was either
closed already or was never returned by open()
.java.io.IOException
- if either closing the underlying socket
connection throws this exception, or if the Feeder
for
this connection was never asked to deliver the data. The latter
happens if the pipeline does not connect back to this class to
fetch the data.