Skip to content

Commit

Permalink
Fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Barenboim committed Jun 27, 2023
1 parent bedb548 commit a778502
Showing 1 changed file with 18 additions and 8 deletions.
26 changes: 18 additions & 8 deletions src/client/WFKafkaClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class KafkaClientTask : public WFKafkaTask

int arrange_offset();

void dispatch_locked();
int dispatch_locked();

inline KafkaBroker *get_broker(int node_id)
{
Expand Down Expand Up @@ -871,27 +871,30 @@ bool KafkaClientTask::check_meta()
return false;
}

void KafkaClientTask::dispatch_locked()
int KafkaClientTask::dispatch_locked()
{
__WFKafkaTask *task;
ParallelWork *parallel;
SeriesWork *series;

if (this->check_cgroup() == false || this->check_meta() == false)
return;
if (this->check_cgroup() == false)
return this->member->cgroup_wait_cnt > 0;

if (this->check_meta() == false)
return this->member->meta_wait_cnt > 0;

if (arrange_toppar(this->api_type) < 0)
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_ARRANGE_FAILED;
this->finish = true;
return;
return 0;
}

if (this->member->cgroup_outdated)
{
series_of(this)->push_front(this);
return;
return 0;
}

switch(this->api_type)
Expand Down Expand Up @@ -1126,6 +1129,8 @@ void KafkaClientTask::dispatch_locked()
this->finish = true;
break;
}

return 0;
}

void KafkaClientTask::dispatch()
Expand All @@ -1144,6 +1149,7 @@ void KafkaClientTask::dispatch()
this->state = task->get_state();
this->error = task->get_error();
this->kafka_error = get_kafka_error();
this->finish = true;
this->subtask_done();
return;
}
Expand All @@ -1156,11 +1162,15 @@ void KafkaClientTask::dispatch()

this->generate_info();

int flag;
this->member->mutex.lock();
this->dispatch_locked();
flag = this->dispatch_locked();
if (flag)
this->subtask_done();
this->member->mutex.unlock();

this->subtask_done();
if (!flag)
this->subtask_done();
}

bool KafkaClientTask::add_topic(const std::string& topic)
Expand Down

0 comments on commit a778502

Please sign in to comment.