Skip to content

Commit

Permalink
Error Handling. Async. Better error handling and error propagation.
Browse files Browse the repository at this point in the history
  • Loading branch information
lganzzzo committed Sep 20, 2019
1 parent c178ab4 commit cb9b2b2
Show file tree
Hide file tree
Showing 28 changed files with 157 additions and 220 deletions.
1 change: 0 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ add_library(oatpp
oatpp/core/concurrency/SpinLock.hpp
oatpp/core/concurrency/Thread.cpp
oatpp/core/concurrency/Thread.hpp
oatpp/core/data/IODefinitions.cpp
oatpp/core/data/IODefinitions.hpp
oatpp/core/data/buffer/FIFOBuffer.cpp
oatpp/core/data/buffer/FIFOBuffer.hpp
Expand Down
120 changes: 72 additions & 48 deletions src/oatpp/core/async/Coroutine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ Action::Action(const FunctionPtr& functionPtr)
m_data.fptr = functionPtr;
}

Action::Action(Error* error)
: m_type(TYPE_ERROR)
{
m_data.error = error;
}

Action::Action(v_int32 type)
: m_type(type)
{}
Expand All @@ -89,12 +95,24 @@ Action::Action(Action&& other)
}

Action::~Action() {
if(m_type == TYPE_COROUTINE) {
delete m_data.coroutine;
free();
}

void Action::free() {
switch(m_type) {
case TYPE_COROUTINE:
delete m_data.coroutine;
break;

case TYPE_ERROR:
delete m_data.error;
break;
}
m_type = TYPE_NONE;
}

Action& Action::operator=(Action&& other) {
free();
m_type = other.m_type;
m_data = other.m_data;
other.m_data.fptr = nullptr;
Expand Down Expand Up @@ -188,16 +206,12 @@ CoroutineStarter& CoroutineStarter::next(CoroutineStarter&& starter) {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// CoroutineHandle

std::shared_ptr<const Error> CoroutineHandle::ERROR_UNKNOWN = std::make_shared<Error>("Unknown Error");

CoroutineHandle::CoroutineHandle(Processor* processor, AbstractCoroutine* rootCoroutine)
: _PP(processor)
, _CP(rootCoroutine)
, _FP(&AbstractCoroutine::act)
, _ERR(nullptr)
, _SCH_A(Action::TYPE_NONE)
, _ref(nullptr)
, m_propagatedError(&_ERR)
{}

CoroutineHandle::~CoroutineHandle() {
Expand All @@ -206,50 +220,65 @@ CoroutineHandle::~CoroutineHandle() {

Action CoroutineHandle::takeAction(Action&& action) {

AbstractCoroutine* savedCP;

switch (action.m_type) {
while (true) {

case Action::TYPE_COROUTINE:
action.m_data.coroutine->m_parent = _CP;
action.m_data.coroutine->m_propagatedError = m_propagatedError;
_CP = action.m_data.coroutine;
_FP = &AbstractCoroutine::act;
return std::forward<oatpp::async::Action>(action);
switch (action.m_type) {

case Action::TYPE_FINISH:
case Action::TYPE_COROUTINE: {
action.m_data.coroutine->m_parent = _CP;
_CP = action.m_data.coroutine;
_FP = &AbstractCoroutine::act;
action.m_type = Action::TYPE_NONE;
return std::forward<oatpp::async::Action>(action);
}

savedCP = _CP;
_CP = _CP->m_parent;
_FP = nullptr;
/* Please note that savedCP->m_parentReturnAction should not be "REPEAT nor WAIT_RETRY" */
/* as funtion pointer (FP) is invalidated */
action = takeAction(std::move(savedCP->m_parentReturnAction));
delete savedCP;
case Action::TYPE_FINISH: {
/* Please note that savedCP->m_parentReturnAction should not be "REPEAT nor WAIT_RETRY" */
/* as funtion pointer (FP) is invalidated */
action = std::move(_CP->m_parentReturnAction);
AbstractCoroutine* savedCP = _CP;
_CP = _CP->m_parent;
_FP = nullptr;
delete savedCP;
continue;
}

return std::forward<oatpp::async::Action>(action);
case Action::TYPE_YIELD_TO: {
_FP = action.m_data.fptr;
return std::forward<oatpp::async::Action>(action);
}

case Action::TYPE_YIELD_TO:
_FP = action.m_data.fptr;
return std::forward<oatpp::async::Action>(action);
case Action::TYPE_ERROR: {
Action newAction = _CP->handleError(action.m_data.error);

case Action::TYPE_ERROR:
do {
action = _CP->handleError(*m_propagatedError);
if(action.m_type == Action::TYPE_ERROR) {
savedCP = _CP;
if (newAction.m_type == Action::TYPE_ERROR) {
AbstractCoroutine* savedCP = _CP;
_CP = _CP->m_parent;
delete savedCP;
if (newAction.m_data.error == action.m_data.error) {
newAction.m_type = Action::TYPE_NONE;
} else {
action = std::move(newAction);
}
if(_CP == nullptr) {
delete action.m_data.error;
action.m_type = Action::TYPE_NONE;
return std::forward<oatpp::async::Action>(action);
}
} else {
action = takeAction(std::forward<oatpp::async::Action>(action));
action = std::move(newAction);
}
} while (action.m_type == Action::TYPE_ERROR && _CP != nullptr);

return std::forward<oatpp::async::Action>(action);
continue;
}

};
default:
return std::forward<oatpp::async::Action>(action);

};

}

//throw std::runtime_error("[oatpp::async::AbstractCoroutine::takeAction()]: Error. Unknown Action.");
return std::forward<oatpp::async::Action>(action);

}
Expand All @@ -258,11 +287,9 @@ Action CoroutineHandle::iterate() {
try {
return _CP->call(_FP);
} catch (std::exception& e) {
*m_propagatedError = std::make_shared<Error>(e.what());
return Action::TYPE_ERROR;
return new Error(e.what());
} catch (...) {
*m_propagatedError = ERROR_UNKNOWN;
return Action::TYPE_ERROR;
return new Error("[oatpp::async::CoroutineHandle::iterate()]: Error. Unknown Exception.");
}
}

Expand All @@ -275,13 +302,11 @@ bool CoroutineHandle::finished() const {

AbstractCoroutine::AbstractCoroutine()
: m_parent(nullptr)
, m_propagatedError(nullptr)
, m_parentReturnAction(Action(Action::TYPE_NONE))
{}

Action AbstractCoroutine::handleError(const std::shared_ptr<const Error>& error) {
(void)error;
return Action::TYPE_ERROR;
Action AbstractCoroutine::handleError(Error* error) {
return Action(error);
}

/**
Expand All @@ -292,9 +317,8 @@ AbstractCoroutine* AbstractCoroutine::getParent() const {
return m_parent;
}

Action AbstractCoroutine::error(const std::shared_ptr<const Error>& error) {
*m_propagatedError = error;
return Action::TYPE_ERROR;
Action AbstractCoroutine::error(Error* error) {
return error;
}

}}
30 changes: 14 additions & 16 deletions src/oatpp/core/async/Coroutine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,16 @@ class Action {
union Data {
FunctionPtr fptr;
AbstractCoroutine* coroutine;
Error* error;
IOData ioData;
v_int64 timePointMicroseconds;
CoroutineWaitList* waitList;
};
private:
mutable v_int32 m_type;
Data m_data;
private:
void free();
protected:
/*
* Create Action by type.
Expand Down Expand Up @@ -247,6 +250,12 @@ class Action {
*/
Action(const FunctionPtr& functionPtr);

/**
* Constructor. Create Error Action.
* @param error - pointer to &id:oatpp::async::Error;.
*/
Action(Error* error);

/**
* Deleted copy-constructor.
*/
Expand Down Expand Up @@ -383,17 +392,12 @@ class CoroutineHandle : public oatpp::base::Countable {
typedef oatpp::async::Action Action;
typedef oatpp::async::Error Error;
typedef Action (AbstractCoroutine::*FunctionPtr)();
private:
static std::shared_ptr<const Error> ERROR_UNKNOWN;
private:
Processor* _PP;
AbstractCoroutine* _CP;
FunctionPtr _FP;
std::shared_ptr<const Error> _ERR;
oatpp::async::Action _SCH_A;
CoroutineHandle* _ref;
private:
std::shared_ptr<const Error>* m_propagatedError;
private:
Action takeAction(Action&& action);
public:
Expand Down Expand Up @@ -456,7 +460,6 @@ class AbstractCoroutine : public oatpp::base::Countable {

private:
AbstractCoroutine* m_parent;
std::shared_ptr<const Error>* m_propagatedError;
protected:
oatpp::async::Action m_parentReturnAction;
public:
Expand Down Expand Up @@ -490,11 +493,11 @@ class AbstractCoroutine : public oatpp::base::Countable {
/**
* Default implementation of handleError(error) function.
* User may override this function in order to handle errors.
* @param error - error.
* @param error - &id:oatpp::async::Error;.
* @return - Action. If handleError function returns Error,
* current coroutine will finish, return control to caller coroutine and handleError is called for caller coroutine.
*/
virtual Action handleError(const std::shared_ptr<const Error>& error);
virtual Action handleError(Error* error);

/**
* Get parent coroutine
Expand All @@ -504,14 +507,10 @@ class AbstractCoroutine : public oatpp::base::Countable {

/**
* Convenience method to generate error reporting Action.
* @param message - error message.
* @param error - &id:oatpp:async::Error;.
* @return - error reporting Action.
*/
Action error(const std::shared_ptr<const Error>& error);

Action propagateError() {
return Action(Action::TYPE_ERROR);
}
Action error(Error* error);

/**
* Convenience method to generate error reporting Action.
Expand All @@ -522,8 +521,7 @@ class AbstractCoroutine : public oatpp::base::Countable {
*/
template<class E, typename ... Args>
Action error(Args... args) {
*m_propagatedError = std::make_shared<E>(args...);
return Action(Action::TYPE_ERROR);
return error(new E(args...));
}

};
Expand Down
4 changes: 3 additions & 1 deletion src/oatpp/core/async/Error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
#ifndef oatpp_async_Error_hpp
#define oatpp_async_Error_hpp

#include "oatpp/core/base/Countable.hpp"

namespace oatpp { namespace async {

/**
* Class to hold and communicate errors between Coroutines
*/
class Error {
class Error : public oatpp::base::Countable {
private:
const char* m_what;
public:
Expand Down
4 changes: 0 additions & 4 deletions src/oatpp/core/async/Processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ void Processor::addCoroutine(CoroutineHandle* coroutine) {

}

action.m_type = Action::TYPE_NONE;

} else {
throw std::runtime_error("[oatpp::async::processor::addTask()]: Error. Attempt to schedule coroutine to wrong processor.");
}
Expand Down Expand Up @@ -242,8 +240,6 @@ bool Processor::iterate(v_int32 numIterations) {
// m_queue.round();
}

action.m_type = Action::TYPE_NONE;

}

}
Expand Down
2 changes: 0 additions & 2 deletions src/oatpp/core/async/worker/IOWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ void IOWorker::run() {

}

dismissAction(action);

++ consumeIteration;
if(consumeIteration == 100) {
consumeIteration = 0;
Expand Down
2 changes: 0 additions & 2 deletions src/oatpp/core/async/worker/TimerWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ void TimerWorker::run() {

}

dismissAction(action);

}

prev = curr;
Expand Down
32 changes: 0 additions & 32 deletions src/oatpp/core/data/IODefinitions.cpp

This file was deleted.

Loading

0 comments on commit cb9b2b2

Please sign in to comment.