pypushflow.tests.test_stop_workflow.WorkflowSleep

class pypushflow.tests.test_stop_workflow.WorkflowSleep(name, **kw)[source]

Bases: Workflow

addActorRef(actorRef)
connectOnError(actor)
endWorkflow(status)
getActorPath()
getListActorRef()
property pool
run(inData, timeout=None, **pool_options)
setStatus(status)
stop(reason='interrupt workflow', forced_interruption=None)
Parameters:
  • reason (str) –

  • forced_interruption (Optional[bool]) –

property stop_exception: BaseException | None
triggerOnError(inData)