Class ProcessingUnit
- java.lang.Object
-
- java.lang.Thread
-
- org.apache.uima.collection.impl.cpm.engine.ProcessingUnit
-
- All Implemented Interfaces:
java.lang.Runnable
public class ProcessingUnit extends java.lang.ThreadThis component executes the processing pipeline. Running in a seperate thread it continuously reads bundles of Cas from the Work Queue filled byArtifactProducerand sends it through configured CasProcessors. The sequence in which CasProcessors are invoked is defined by the order of Cas Processor listing in the cpe descriptor. The results of analysis produced be Cas Processors is enqueued onto an output queue that is shared with Cas Consumers.
-
-
Field Summary
Fields Modifier and Type Field Description protected java.lang.Object[]artifactprotected CAS[]casListprotected CPECasPoolcasPoolprotected CASconversionCasprotected CAS[]conversionCasArrayprotected CpeConfigurationcpeConfigurationprotected CPMEnginecpmprotected CasConvertermConverterprotected booleannotifyListenersprotected longnumToProcessprotected BoundedWorkQueueoutputQueueprotected java.util.LinkedListprocessContainersprotected ProcessTraceprocessingUnitProcessTraceprotected booleanreleaseCASprotected java.util.ArrayListstatusCbLprotected java.lang.StringthreadIdintthreadStateprotected UimaTimertimerlongtimer01longtimer02longtimer03longtimer04longtimer05longtimer06protected BoundedWorkQueueworkQueue
-
Constructor Summary
Constructors Constructor Description ProcessingUnit()ProcessingUnit(CPMEngine acpm)ProcessingUnit(CPMEngine acpm, BoundedWorkQueue aInputQueue, BoundedWorkQueue aOutputQueue)Initialize the PU
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidaddStatusCallbackListener(BaseStatusCallbackListener aListener)Plugs in Listener object used for notifications.protected booleananalyze(java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp)An alternate processing loop designed for the single-threaded CPM.voidcleanup()Null out fields of this object.booleanconsumeQueue()Consumes the input queue to make sure all bundles still there get processede before CPE terminates.voiddisableCasProcessor(int aCasProcessorIndex)Disable a CASProcessor in the processing pipeline.voiddisableCasProcessor(java.lang.String aCasProcessorName)Alternative method to disable Cas Processor.protected voiddoNotifyListeners(java.lang.Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)Notifies all configured listeners.voidenableCasProcessor(java.lang.String aCasProcessorName)Enables Cas Processor with a given name.protected booleanendOfProcessingReached(long aCount)Returns true if the CPM has finished analyzing the collection.protected longgetBytes(java.lang.Object aCas)Returns the size of the CAS object.java.util.ArrayListgetCallbackListeners()Returns list of listeners used by this PU for callbacks.booleanisCasConsumerPipeline()protected booleanisProcessorReady(int aStatus)Check if the CASProcessor status is available for processingbooleanisRunning()Returns true if this component is in running state.protected voidnotifyListeners(java.lang.Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'esprotected voidprocess(java.lang.Object anArtifact)protected booleanprocessNext(java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp)Executes the processing pipeline.voidremoveStatusCallbackListener(BaseStatusCallbackListener aListener)Removes given listener from the list of listenersvoidrun()Starts the Processing Pipeline thread.voidsetCasConsumerPipelineIdentity()Define a CasConsumer Pipeline identity for this instancevoidsetCasPool(CPECasPool aPool)voidsetContainers(java.util.LinkedList processorList)Plugs in a list of Cas Processor containers.voidsetCPMEngine(CPMEngine acpm)Alternative method of providing the reference to the component managing the lifecycle of the CPEvoidsetInputQueue(BoundedWorkQueue aInputQueue)Alternative method of providing a queue from which this PU will read bundle of CasvoidsetNotifyListeners(boolean aDoNotify)Set a flag indicating if notifications should be made via configured ListenersvoidsetOutputQueue(BoundedWorkQueue aOutputQueue)Alternative method of providing a queue where this PU will deposit results of analysisvoidsetProcessingUnitProcessTrace(ProcessTrace aProcessingUnitProcessTrace)Plugs in ProcessTrace object used to collect statisticsvoidsetReleaseCASFlag(boolean aFlag)Called by the CPMEngine during setup to indicate that this thread is supposed to release a CAS at the end of processing.voidsetUimaTimer(UimaTimer aTimer)Plugs in custom timer used by the PU for getting timeprotected voidshowMetadata(java.lang.Object[] aCasList)voidstopCasProcessors(boolean kill)Stops all Cas Processors that are part of this PU.-
Methods inherited from class java.lang.Thread
activeCount, checkAccess, clone, countStackFrames, currentThread, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, onSpinWait, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, suspend, toString, yield
-
-
-
-
Field Detail
-
threadState
public int threadState
-
casPool
protected CPECasPool casPool
-
releaseCAS
protected boolean releaseCAS
-
cpm
protected CPMEngine cpm
-
workQueue
protected BoundedWorkQueue workQueue
-
outputQueue
protected BoundedWorkQueue outputQueue
-
mConverter
protected CasConverter mConverter
-
processingUnitProcessTrace
protected ProcessTrace processingUnitProcessTrace
-
processContainers
protected java.util.LinkedList processContainers
-
numToProcess
protected long numToProcess
-
casList
protected CAS[] casList
-
statusCbL
protected java.util.ArrayList statusCbL
-
notifyListeners
protected boolean notifyListeners
-
conversionCas
protected CAS conversionCas
-
artifact
protected java.lang.Object[] artifact
-
conversionCasArray
protected CAS[] conversionCasArray
-
timer
protected UimaTimer timer
-
threadId
protected java.lang.String threadId
-
cpeConfiguration
protected CpeConfiguration cpeConfiguration
-
timer01
public long timer01
-
timer02
public long timer02
-
timer03
public long timer03
-
timer04
public long timer04
-
timer05
public long timer05
-
timer06
public long timer06
-
-
Constructor Detail
-
ProcessingUnit
public ProcessingUnit()
-
ProcessingUnit
public ProcessingUnit(CPMEngine acpm, BoundedWorkQueue aInputQueue, BoundedWorkQueue aOutputQueue)
Initialize the PU- Parameters:
acpm- - component managing life cycle of the CPEaInputQueue- - queue to read fromaOutputQueue- - queue to write to
-
ProcessingUnit
public ProcessingUnit(CPMEngine acpm)
-
-
Method Detail
-
isRunning
public boolean isRunning()
Returns true if this component is in running state.- Returns:
- - true if running, false otherwise
-
setCasConsumerPipelineIdentity
public void setCasConsumerPipelineIdentity()
Define a CasConsumer Pipeline identity for this instance
-
isCasConsumerPipeline
public boolean isCasConsumerPipeline()
-
setInputQueue
public void setInputQueue(BoundedWorkQueue aInputQueue)
Alternative method of providing a queue from which this PU will read bundle of Cas- Parameters:
aInputQueue- - read queue
-
setOutputQueue
public void setOutputQueue(BoundedWorkQueue aOutputQueue)
Alternative method of providing a queue where this PU will deposit results of analysis- Parameters:
aOutputQueue- - queue to write to
-
setCPMEngine
public void setCPMEngine(CPMEngine acpm)
Alternative method of providing the reference to the component managing the lifecycle of the CPE- Parameters:
acpm- - reference to the contrlling engine
-
cleanup
public void cleanup()
Null out fields of this object. Call this only when this object is no longer needed.
-
setNotifyListeners
public void setNotifyListeners(boolean aDoNotify)
Set a flag indicating if notifications should be made via configured Listeners- Parameters:
aDoNotify- - true if notification is required, false otherwise
-
addStatusCallbackListener
public void addStatusCallbackListener(BaseStatusCallbackListener aListener)
Plugs in Listener object used for notifications.- Parameters:
aListener- -BaseStatusCallbackListenerinstance
-
getCallbackListeners
public java.util.ArrayList getCallbackListeners()
Returns list of listeners used by this PU for callbacks.- Returns:
- - lif of
BaseStatusCallbackListenerinstances
-
removeStatusCallbackListener
public void removeStatusCallbackListener(BaseStatusCallbackListener aListener)
Removes given listener from the list of listeners- Parameters:
aListener- - object to remove from the list
-
setProcessingUnitProcessTrace
public void setProcessingUnitProcessTrace(ProcessTrace aProcessingUnitProcessTrace)
Plugs in ProcessTrace object used to collect statistics- Parameters:
aProcessingUnitProcessTrace- - object to compile stats
-
setUimaTimer
public void setUimaTimer(UimaTimer aTimer)
Plugs in custom timer used by the PU for getting time- Parameters:
aTimer- - custom timer to use
-
setContainers
public void setContainers(java.util.LinkedList processorList)
Plugs in a list of Cas Processor containers. During processing Cas Processors in this list are called sequentially. Each Cas Processor is contained in the container that is managing errors, counts and totals, and restarts.- Parameters:
processorList- CASProcessor to be added to the processing pipeline
-
disableCasProcessor
public void disableCasProcessor(int aCasProcessorIndex)
Disable a CASProcessor in the processing pipeline. Locate it by provided index. The disabled Cas Processor remains in the Processing Pipeline, however it is not used furing processing.- Parameters:
aCasProcessorIndex- - location in the pipeline of the Cas Processor to delete
-
disableCasProcessor
public void disableCasProcessor(java.lang.String aCasProcessorName)
Alternative method to disable Cas Processor. Uses a name to locate it.- Parameters:
aCasProcessorName- - a name of the Cas Processor to disable
-
enableCasProcessor
public void enableCasProcessor(java.lang.String aCasProcessorName)
Enables Cas Processor with a given name. Enabled Cas Processor will immediately begin to receive bundles of Cas.- Parameters:
aCasProcessorName- - name of the Cas Processor to enable
-
run
public void run()
Starts the Processing Pipeline thread. This thread waits for an artifact to arrive on configured Work Queue. Once the CAS arrives, it is removed from the queue and sent through the analysis pipeline.- Specified by:
runin interfacejava.lang.Runnable- Overrides:
runin classjava.lang.Thread
-
consumeQueue
public boolean consumeQueue()
Consumes the input queue to make sure all bundles still there get processede before CPE terminates.
-
processNext
protected boolean processNext(java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp) throws ResourceProcessException, java.io.IOException, CollectionException, AbortCPMException, KillPipelineExceptionExecutes the processing pipeline. Given bundle of Cas instances is processed by each Cas Processor in the pipeline. Conversions between different types of Cas Processors is done on the fly. Two types of Cas Processors are currently supported:- CasDataProcessor
- CasObjectProcessor
- Parameters:
aCasObjectList- - bundle of Cas to analyzepTrTemp- - object used to aggregate stats- Throws:
ResourceProcessExceptionjava.io.IOExceptionCollectionExceptionAbortCPMExceptionKillPipelineException
-
notifyListeners
protected void notifyListeners(java.lang.Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es- Parameters:
aCas- - object containing an array of OR a single instance of CasisCasObject- - true if instance of Cas is of type Cas, false otherwiseaEntityProcStatus- - status object that may contain exceptions and trace
-
doNotifyListeners
protected void doNotifyListeners(java.lang.Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)Notifies all configured listeners. Makes sure that appropriate type of Cas is sent to the listener. Convertions take place to ensure compatibility.- Parameters:
aCas- - Cas to pass to listenerisCasObject- - true is Cas is of type CASaEntityProcStatus- - status object containing exceptions and trace info
-
setReleaseCASFlag
public void setReleaseCASFlag(boolean aFlag)
Called by the CPMEngine during setup to indicate that this thread is supposed to release a CAS at the end of processing. This is typically done for Cas Consumer thread, but in configurations not using Cas Consumers The processing pipeline may also release the CAS.- Parameters:
aFlag- - true if this thread should release a CAS when analysis is complete
-
stopCasProcessors
public void stopCasProcessors(boolean kill)
Stops all Cas Processors that are part of this PU.- Parameters:
kill- - true if CPE has been stopped before finishing processing during external stop
-
endOfProcessingReached
protected boolean endOfProcessingReached(long aCount)
Returns true if the CPM has finished analyzing the collection.- Parameters:
aCount- - running total of documents processed so far- Returns:
- - true if CPM has processed all docs, false otherwise
-
process
protected void process(java.lang.Object anArtifact)
- Parameters:
anArtifact-
-
showMetadata
protected void showMetadata(java.lang.Object[] aCasList)
- Parameters:
aCasList-
-
isProcessorReady
protected boolean isProcessorReady(int aStatus)
Check if the CASProcessor status is available for processing
-
getBytes
protected long getBytes(java.lang.Object aCas)
Returns the size of the CAS object. Currently only CASData is supported.- Parameters:
aCas- - Cas to get the size for- Returns:
- the size of the CAS object. Currently only CASData is supported.
-
setCasPool
public void setCasPool(CPECasPool aPool)
- Parameters:
aPool-
-
analyze
protected boolean analyze(java.lang.Object[] aCasObjectList, ProcessTrace pTrTemp) throws java.lang.ExceptionAn alternate processing loop designed for the single-threaded CPM.- Parameters:
aCasObjectList- - a list of CASes to analyzepTrTemp- - process trace where statistics are added during analysis- Throws:
java.lang.Exception
-
-