public abstract class StreamBase extends MapReduceBase
StreamSource
and StreamProcessor
.
Extends MapReduceBase
and provides a more abstract streaming API.Operation.OperationType
Modifier and Type | Field and Description |
---|---|
protected String |
identifier |
protected Map<String,String> |
incomingStreamTypes |
protected boolean |
isBufferingAccess |
protected Map<String,String> |
outGoingStreamTypes |
protected StreamDataset |
streamDataset |
defaultOutputStream, mapReduceDataset, MODULE
Constructor and Description |
---|
StreamBase() |
Modifier and Type | Method and Description |
---|---|
protected void |
addStreamConsumer(ds.funnel.topic.Topic topic,
StreamBase destination) |
protected abstract void |
declareOutputStreams()
The classes that extend
StreamProcessor and StreamSource should
implement this method in order to declare output streams. |
protected void |
declareStream(String streamId,
String streamType)
Declare an output stream from the Stream operator.
|
protected String |
getDatasetIdentifier() |
protected StreamDataset |
getDefaultStreamDataset() |
protected String |
getStreamIdentifier(String streamIdentifier) |
void |
initialize(Properties providedDirectives)
Initialize the operation based on the specified processing directives.
|
void |
marshall(ds.funnel.data.format.FormatWriter mFormat) |
void |
unmarshall(ds.funnel.data.format.FormatReader mFormat)
Unmarshall the operation from a set of marshalled bytes.
|
protected void |
writeToStream(String streamId,
StreamEvent event)
Write a stream data element to a Stream.
|
addMap, addMapper, addReduce, addReducer, createResults, getAwaitingOutputsFrom, getDatasetCollection, getMapIdentifiers, getMapReduceDataset, getNumberOfMaps, getNumberOfReducers, getReduceIdentifiers, isAwaitingOutputs, removeMap, removeMapper, removeReduce, removeReducer, resetOutputTracker, setReceivedOutputFrom, writeResults, writeResults, writeResults, writeResults
changeToExecuteFixedNumberOfTimes, changeToExecuteOnce, changeToExecutePeriodically, changeToExecuteWhenDataAvailable, changeToStayAlive, dispose, execute, getDomain, getExecutionProfile, getInstanceIdentifier, getOperationIdentifier, getProcessingDirectives, getVersionInformation, hasDatasetCollection, hasExecutionProfile, hasProcessingDirectives, isInitialized, markInitializationAsComplete, needsInitialization, setAsExecuteContinuously, setAsExecuteFixedNumberOfTimes, setAsExecuteOnce, setAsExecutePeriodically, setAsExecuteWhenDataAvailable, setAsStayAlive, setDatasetCollection, setDomain, setInitializationComplete, setInitializationNeeded, setInstanceIdentifier, setOperationIdentifier, setProcessingDirectives, setTeminationConditionReached, setVersionInformation, terminationConditionReached, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
dispose, execute, getDomain, getExecutionProfile, getInstanceIdentifier, getOperationIdentifier, getProcessingDirectives, getVersionInformation, hasDatasetCollection, hasExecutionProfile, hasProcessingDirectives, isInitialized, needsInitialization, terminationConditionReached, toString
protected StreamDataset streamDataset
protected String identifier
protected boolean isBufferingAccess
public void initialize(Properties providedDirectives) throws ProcessingException
Operation
initialize
in interface Operation
initialize
in class OperationBase
providedDirectives
- The specified processing directivesProcessingException
- If there are problems initializing based on the specified
processing directives.public void unmarshall(ds.funnel.data.format.FormatReader mFormat) throws ds.funnel.data.format.FormatException
Operation
unmarshall
in interface Operation
unmarshall
in class MapReduceBase
mFormat
- The byte[] representation of the operation.ds.funnel.data.format.FormatException
public void marshall(ds.funnel.data.format.FormatWriter mFormat) throws ds.funnel.data.format.FormatException
marshall
in interface ds.funnel.data.format.FormatMarshaller
marshall
in class MapReduceBase
ds.funnel.data.format.FormatException
protected abstract void declareOutputStreams() throws StreamingGraphConfigurationException
StreamProcessor
and StreamSource
should
implement this method in order to declare output streams.StreamingGraphConfigurationException
- Error in the stream processing graph.protected String getDatasetIdentifier()
protected void declareStream(String streamId, String streamType) throws StreamingGraphConfigurationException
streamId
- An Identifier for the stream. This should be unique within a topology.streamType
- Type of the Stream event which should be an implementation of StreamEvent
.StreamingGraphConfigurationException
- Error in the stream processing graph.protected void addStreamConsumer(ds.funnel.topic.Topic topic, StreamBase destination) throws StreamingGraphConfigurationException, StreamingDatasetException
protected void writeToStream(String streamId, StreamEvent event) throws StreamingDatasetException
streamId
- Identifier of the stream to which the data is writtenevent
- Stream data elementStreamingDatasetException
- Error when writing to the streamprotected StreamDataset getDefaultStreamDataset()
Copyright © 2015. All rights reserved.