Skip to content

Commit

Permalink
Optimize Async Memory. Introduce CoroutineHandle.
Browse files Browse the repository at this point in the history
  • Loading branch information
lganzzzo committed Sep 15, 2019
1 parent 68bbb14 commit 133b79f
Show file tree
Hide file tree
Showing 20 changed files with 178 additions and 161 deletions.
75 changes: 38 additions & 37 deletions src/oatpp/core/async/Coroutine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,35 +186,25 @@ CoroutineStarter& CoroutineStarter::next(CoroutineStarter&& starter) {
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// AbstractCoroutine
// CoroutineHandle

std::shared_ptr<const Error> AbstractCoroutine::ERROR_UNKNOWN = std::make_shared<Error>("Unknown Error");
std::shared_ptr<const Error> CoroutineHandle::ERROR_UNKNOWN = std::make_shared<Error>("Unknown Error");

AbstractCoroutine::AbstractCoroutine()
: _CP(this)
CoroutineHandle::CoroutineHandle(Processor* processor, AbstractCoroutine* rootCoroutine)
: _PP(processor)
, _CP(rootCoroutine)
, _FP(&AbstractCoroutine::act)
, _ERR(nullptr)
, _PP(nullptr)
, _SCH_A(Action::TYPE_NONE)
, _ref(nullptr)
, m_parent(nullptr)
, m_propagatedError(&_ERR)
, m_parentReturnAction(Action(Action::TYPE_FINISH))
{}

Action AbstractCoroutine::iterate() {
try {
return _CP->call(_FP);
} catch (std::exception& e) {
*m_propagatedError = std::make_shared<Error>(e.what());
return Action::TYPE_ERROR;
} catch (...) {
*m_propagatedError = ERROR_UNKNOWN;
return Action::TYPE_ERROR;
}
};
CoroutineHandle::~CoroutineHandle() {
delete _CP;
}

Action AbstractCoroutine::takeAction(Action&& action) {
Action CoroutineHandle::takeAction(Action&& action) {

AbstractCoroutine* savedCP;

Expand All @@ -224,15 +214,10 @@ Action AbstractCoroutine::takeAction(Action&& action) {
action.m_data.coroutine->m_parent = _CP;
action.m_data.coroutine->m_propagatedError = m_propagatedError;
_CP = action.m_data.coroutine;
_FP = action.m_data.coroutine->_FP;

_FP = &AbstractCoroutine::act;
return std::forward<oatpp::async::Action>(action);

case Action::TYPE_FINISH:
if(_CP == this) {
_CP = nullptr;
return std::forward<oatpp::async::Action>(action);
}

savedCP = _CP;
_CP = _CP->m_parent;
Expand All @@ -252,14 +237,9 @@ Action AbstractCoroutine::takeAction(Action&& action) {
do {
action = _CP->handleError(*m_propagatedError);
if(action.m_type == Action::TYPE_ERROR) {
if(_CP == this) {
_CP = nullptr;
return std::forward<oatpp::async::Action>(action);
} else {
savedCP = _CP;
_CP = _CP->m_parent;
delete savedCP;
}
savedCP = _CP;
_CP = _CP->m_parent;
delete savedCP;
} else {
action = takeAction(std::forward<oatpp::async::Action>(action));
}
Expand All @@ -274,15 +254,36 @@ Action AbstractCoroutine::takeAction(Action&& action) {

}

Action AbstractCoroutine::handleError(const std::shared_ptr<const Error>& error) {
(void)error;
return Action::TYPE_ERROR;
Action CoroutineHandle::iterate() {
try {
return _CP->call(_FP);
} catch (std::exception& e) {
*m_propagatedError = std::make_shared<Error>(e.what());
return Action::TYPE_ERROR;
} catch (...) {
*m_propagatedError = ERROR_UNKNOWN;
return Action::TYPE_ERROR;
}
}

bool AbstractCoroutine::finished() const {
bool CoroutineHandle::finished() const {
return _CP == nullptr;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// AbstractCoroutine

AbstractCoroutine::AbstractCoroutine()
: m_parent(nullptr)
, m_propagatedError(nullptr)
, m_parentReturnAction(Action(Action::TYPE_NONE))
{}

Action AbstractCoroutine::handleError(const std::shared_ptr<const Error>& error) {
(void)error;
return Action::TYPE_ERROR;
}

/**
* Get parent coroutine
* @return - pointer to a parent coroutine
Expand Down
67 changes: 40 additions & 27 deletions src/oatpp/core/async/Coroutine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

namespace oatpp { namespace async {

class CoroutineHandle; // FWD
class AbstractCoroutine; // FWD
class Processor; // FWD
class CoroutineStarter; // FWD
Expand All @@ -54,6 +55,7 @@ namespace worker {
*/
class Action {
friend Processor;
friend CoroutineHandle;
friend AbstractCoroutine;
friend CoroutineStarter;
friend worker::Worker;
Expand Down Expand Up @@ -370,14 +372,48 @@ class CoroutineStarter {
};

/**
* Abstract Coroutine. Base class for Coroutines. It provides state management, coroutines stack management and error reporting functionality.
* This class manages coroutines processing state and a chain of coroutine calls.
*/
class AbstractCoroutine : public oatpp::base::Countable {
friend oatpp::collection::FastQueue<AbstractCoroutine>;
class CoroutineHandle : public oatpp::base::Countable {
friend oatpp::collection::FastQueue<CoroutineHandle>;
friend Processor;
friend CoroutineStarter;
friend worker::Worker;
friend CoroutineWaitList;
public:
typedef oatpp::async::Action Action;
typedef oatpp::async::Error Error;
typedef Action (AbstractCoroutine::*FunctionPtr)();
private:
static std::shared_ptr<const Error> ERROR_UNKNOWN;
private:
Processor* _PP;
AbstractCoroutine* _CP;
FunctionPtr _FP;
std::shared_ptr<const Error> _ERR;
oatpp::async::Action _SCH_A;
CoroutineHandle* _ref;
private:
std::shared_ptr<const Error>* m_propagatedError;
private:
Action takeAction(Action&& action);
public:

CoroutineHandle(Processor* processor, AbstractCoroutine* rootCoroutine);

~CoroutineHandle();

Action iterate();

bool finished() const;

};

/**
* Abstract Coroutine. Base class for Coroutines. It provides state management, coroutines stack management and error reporting functionality.
*/
class AbstractCoroutine : public oatpp::base::Countable {
friend CoroutineStarter;
friend CoroutineHandle;
public:
/**
* Convenience typedef for Action
Expand Down Expand Up @@ -418,35 +454,18 @@ class AbstractCoroutine : public oatpp::base::Countable {
return std::unique_ptr<AbstractMemberCaller<Args...>>(new MemberCaller<T, Args...>(func));
}

private:
static std::shared_ptr<const Error> ERROR_UNKNOWN;
private:
AbstractCoroutine* _CP;
FunctionPtr _FP;
std::shared_ptr<const Error> _ERR;
Processor* _PP;
oatpp::async::Action _SCH_A;
AbstractCoroutine* _ref;
private:
AbstractCoroutine* m_parent;
std::shared_ptr<const Error>* m_propagatedError;
protected:
oatpp::async::Action m_parentReturnAction;
private:
Action takeAction(Action&& action);
public:

/**
* Constructor.
*/
AbstractCoroutine();

/**
* Make one Coroutine iteration.
* @return - control Action.
*/
Action iterate();

/**
* Virtual Destructor
*/
Expand Down Expand Up @@ -477,12 +496,6 @@ class AbstractCoroutine : public oatpp::base::Countable {
*/
virtual Action handleError(const std::shared_ptr<const Error>& error);

/**
* Check if coroutine is finished
* @return - true if finished
*/
bool finished() const;

/**
* Get parent coroutine
* @return - pointer to a parent coroutine
Expand Down
4 changes: 2 additions & 2 deletions src/oatpp/core/async/CoroutineWaitList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void CoroutineWaitList::setListener(Listener* listener) {
m_listener = listener;
}

void CoroutineWaitList::pushFront(AbstractCoroutine* coroutine) {
void CoroutineWaitList::pushFront(CoroutineHandle* coroutine) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
m_list.pushFront(coroutine);
Expand All @@ -52,7 +52,7 @@ void CoroutineWaitList::pushFront(AbstractCoroutine* coroutine) {
}
}

void CoroutineWaitList::pushBack(AbstractCoroutine* coroutine) {
void CoroutineWaitList::pushBack(CoroutineHandle* coroutine) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
m_list.pushBack(coroutine);
Expand Down
6 changes: 3 additions & 3 deletions src/oatpp/core/async/CoroutineWaitList.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class CoroutineWaitList {
virtual void onNewItem(CoroutineWaitList& list) = 0;
};
private:
oatpp::collection::FastQueue<AbstractCoroutine> m_list;
oatpp::collection::FastQueue<CoroutineHandle> m_list;
oatpp::concurrency::SpinLock m_lock;
Listener* m_listener = nullptr;
protected:
Expand All @@ -65,14 +65,14 @@ class CoroutineWaitList {
* This method should be called by Coroutine Processor only.
* @param coroutine
*/
void pushFront(AbstractCoroutine* coroutine);
void pushFront(CoroutineHandle* coroutine);

/*
* Put coroutine on wait-list.
* This method should be called by Coroutine Processor only.
* @param coroutine
*/
void pushBack(AbstractCoroutine* coroutine);
void pushBack(CoroutineHandle* coroutine);
public:

/**
Expand Down
4 changes: 2 additions & 2 deletions src/oatpp/core/async/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ void Executor::SubmissionProcessor::run() {

}

void Executor::SubmissionProcessor::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
void Executor::SubmissionProcessor::pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks) {
(void)tasks;
std::runtime_error("[oatpp::async::Executor::SubmissionProcessor::pushTasks]: Error. This method does nothing.");
}

void Executor::SubmissionProcessor::pushOneTask(AbstractCoroutine* task) {
void Executor::SubmissionProcessor::pushOneTask(CoroutineHandle* task) {
(void)task;
std::runtime_error("[oatpp::async::Executor::SubmissionProcessor::pushOneTask]: Error. This method does nothing.");
}
Expand Down
4 changes: 2 additions & 2 deletions src/oatpp/core/async/Executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ class Executor {

oatpp::async::Processor& getProcessor();

void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) override;
void pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks) override;

void pushOneTask(AbstractCoroutine* task) override;
void pushOneTask(CoroutineHandle* task) override;

void run();

Expand Down
21 changes: 10 additions & 11 deletions src/oatpp/core/async/Processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ void Processor::addWorker(const std::shared_ptr<worker::Worker>& worker) {

case worker::Worker::Type::IO:
m_ioWorkers.push_back(worker);
m_ioPopQueues.push_back(collection::FastQueue<AbstractCoroutine>());
m_ioPopQueues.push_back(collection::FastQueue<CoroutineHandle>());
break;

case worker::Worker::Type::TIMER:
m_timerWorkers.push_back(worker);
m_timerPopQueues.push_back(collection::FastQueue<AbstractCoroutine>());
m_timerPopQueues.push_back(collection::FastQueue<CoroutineHandle>());
break;

default:
Expand All @@ -49,7 +49,7 @@ void Processor::addWorker(const std::shared_ptr<worker::Worker>& worker) {

}

void Processor::popIOTask(AbstractCoroutine* coroutine) {
void Processor::popIOTask(CoroutineHandle* coroutine) {
if(m_ioPopQueues.size() > 0) {
auto &queue = m_ioPopQueues[(++m_ioBalancer) % m_ioPopQueues.size()];
queue.pushBack(coroutine);
Expand All @@ -59,7 +59,7 @@ void Processor::popIOTask(AbstractCoroutine* coroutine) {
}
}

void Processor::popTimerTask(AbstractCoroutine* coroutine) {
void Processor::popTimerTask(CoroutineHandle* coroutine) {
if(m_timerPopQueues.size() > 0) {
auto &queue = m_timerPopQueues[(++m_timerBalancer) % m_timerPopQueues.size()];
queue.pushBack(coroutine);
Expand All @@ -69,7 +69,7 @@ void Processor::popTimerTask(AbstractCoroutine* coroutine) {
}
}

void Processor::addCoroutine(AbstractCoroutine* coroutine) {
void Processor::addCoroutine(CoroutineHandle* coroutine) {

if(coroutine->_PP == this) {

Expand Down Expand Up @@ -105,23 +105,23 @@ void Processor::addCoroutine(AbstractCoroutine* coroutine) {
action.m_type = Action::TYPE_NONE;

} else {
throw std::runtime_error("[oatpp::async::processor::addCoroutine()]: Error. Attempt to schedule coroutine to wrong processor.");
throw std::runtime_error("[oatpp::async::processor::addTask()]: Error. Attempt to schedule coroutine to wrong processor.");
}

}

void Processor::pushOneTask(AbstractCoroutine* coroutine) {
void Processor::pushOneTask(CoroutineHandle* coroutine) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_taskLock);
m_pushList.pushBack(coroutine);
}
m_taskCondition.notify_one();
}

void Processor::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
void Processor::pushTasks(oatpp::collection::FastQueue<CoroutineHandle>& tasks) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_taskLock);
collection::FastQueue<AbstractCoroutine>::moveAll(tasks, m_pushList);
collection::FastQueue<CoroutineHandle>::moveAll(tasks, m_pushList);
}
m_taskCondition.notify_one();
}
Expand Down Expand Up @@ -154,8 +154,7 @@ void Processor::popTasks() {
void Processor::consumeAllTasks() {
for(auto& submission : m_taskList) {
auto coroutine = submission->createCoroutine();
coroutine->_PP = this;
m_queue.pushBack(coroutine);
m_queue.pushBack(new CoroutineHandle(this, coroutine));
}
m_taskList.clear();
}
Expand Down
Loading

0 comments on commit 133b79f

Please sign in to comment.