Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Workers suddenly stopped processing jobs on AWS memorydb #2757

Open
1 task done
nullndr opened this issue Sep 4, 2024 · 11 comments
Open
1 task done

[Bug]: Workers suddenly stopped processing jobs on AWS memorydb #2757

nullndr opened this issue Sep 4, 2024 · 11 comments
Labels
bug Something isn't working

Comments

@nullndr
Copy link
Contributor

nullndr commented Sep 4, 2024

Version

5.12.12

Platform

NodeJS

What happened?

I was using bullmq 5.6 where I suffered the issue #2466.

After upgrading to 5.12 some workers suddenly stop processing jobs, actually I think they got stuck since I was unable to gracefully shut them down with the following code that works flawless in 5.6:

let isClosing = false;

const runOnce = async (callback: () => Promise<void>) => {
  if (!isClosing) {
    isClosing = true;
    await callback();
  }
};

const closeFlows = async () => {
  const res = await Promise.allSettled([
    runAutomationFlow.close(),
    runCampaignFlow.close(),
  ]);

  if (res.some(({ status }) => status === "rejected")) {
    logger(logger.err("Something went wrong while closing flows"));
  } else {
    logger(logger.ok("Flows successfully closed"));
  }
};

const closeQueues = async () => {
  const res = await Promise.allSettled([
    foo.queue.close(),
    bar.queue.close(),
    baz.queue.close(),
  ]);

  if (res.some(({ status }) => status === "rejected")) {
    logger(logger.err("Something went wrong while closing queues"));
  } else {
    logger(logger.ok("Queues successfully closed"));
  }
};

const closeWorkers = async () => {
  const res = await Promise.allSettled([
    foo.worker.close(),
    bar.worker.close(),
    baz.worker.close(),
  ]);

  if (res.some(({ status }) => status === "rejected")) {
    logger(logger.err("Something went wrong while closing workers"));
  } else {
    logger(logger.ok("Workers successfully closed"));
  }
};

const disconnectDb = async () => {
  try {
    await db.$disconnect();
    logger(logger.ok("Database connection successfully closed"));
  } catch (error) {
    logger(
      logger.err("Something went wrong while disconnecting the database"),
      {
        error,
      },
    );
    throw error;
  }
};

const disconnectRedis = async () => {
  const res = await Promise.allSettled([
    queueRedisConnection.quit(),
    workerRedisConnection.quit(),
    flowRedisConnection.quit(),
  ]);

  if (res.some(({ status }) => status === "rejected")) {
    logger(logger.err("Something went wrong while redis connections"));
  } else {
    logger(logger.ok("Redis connections successfully closed"));
  }
};

const closeAll = async () => {
  await Promise.allSettled([closeWorkers(), closeQueues(), closeFlows()]);
  /**
   * The database and the redis connection must be closed after all workers complete.
   */
  await Promise.allSettled([disconnectDb(), disconnectRedis()]);
  await notify({
    type: NotifyType.JobStopped,
    pid: process.pid,
  });
};

My configs for the queues, workers and flows are the following:

const baseRedisOptions: RedisOptions = {
  maxRetriesPerRequest: null,
  enableReadyCheck: false,
  showFriendlyErrorStack: true,
  retryStrategy: (t) => t * t * 1000,
  tls:
    process.env.NODE_ENV === "production" && Env.get("CLUSTER_MODE_ENABLED")
      ? /*
         * This is the same as the `--skipverify` flag in redli.
         * In production we must have a strong certificate, with a know authority.
         */
        { rejectUnauthorized: false }
      : undefined,
};

const queueRedisOptions: RedisOptions = {
  ...baseRedisOptions,
  enableOfflineQueue: false,
};

const workerRedisOptions: RedisOptions = {
  ...baseRedisOptions,
  enableOfflineQueue: true,
  maxRetriesPerRequest: null,
};

export const queueRedisConnection =
  process.env.NODE_ENV === "production" && Env.get("CLUSTER_MODE_ENABLED")
    ? new Redis.Cluster([{ host: Env.get("REDIS_HOST"), port: 6379 }], {
        clusterRetryStrategy: () => null,
        enableReadyCheck: false,
        enableOfflineQueue: false,
        showFriendlyErrorStack: true,
        redisOptions: {
          tls: {
            checkServerIdentity: () => undefined,
          },
          username: Env.get("REDIS_USERNAME"),
          password: Env.get("REDIS_PASSWORD"),
        },
      })
    : new Redis(Env.get("REDIS_HOST"), queueRedisOptions);

export const workerRedisConnection =
  process.env.NODE_ENV === "production" && Env.get("CLUSTER_MODE_ENABLED")
    ? new Redis.Cluster([{ host: Env.get("REDIS_HOST"), port: 6379 }], {
        clusterRetryStrategy: () => null,
        enableReadyCheck: false,
        enableOfflineQueue: true,
        showFriendlyErrorStack: true,
        redisOptions: {
          tls: {
            checkServerIdentity: () => undefined,
          },
          username: Env.get("REDIS_USERNAME"),
          password: Env.get("REDIS_PASSWORD"),
          maxRetriesPerRequest: null,
        },
      })
    : new Redis(Env.get("REDIS_HOST"), workerRedisOptions);

export const flowRedisConnection =
  process.env.NODE_ENV === "production" && Env.get("CLUSTER_MODE_ENABLED")
    ? new Redis.Cluster([{ host: Env.get("REDIS_HOST"), port: 6379 }], {
        clusterRetryStrategy: () => null,
        enableReadyCheck: false,
        enableOfflineQueue: false,
        showFriendlyErrorStack: true,
        redisOptions: {
          tls: {
            checkServerIdentity: () => undefined,
          },
          username: Env.get("REDIS_USERNAME"),
          password: Env.get("REDIS_PASSWORD"),
        },
      })
    : new Redis(Env.get("REDIS_HOST"), queueRedisOptions);

The sudden stop of processing them can be easily seen in the memorydb metrics:

image

Please tell me how can I provide you more usefull informations.

  • I agree to follow this project's Code of Conduct
@nullndr nullndr added the bug Something isn't working label Sep 4, 2024
@manast
Copy link
Contributor

manast commented Sep 4, 2024

Unfortunately there is not a lot for us to go for with this information... are there jobs in the wait list, or delayed, are all the expected workers actually online?

@nullndr
Copy link
Contributor Author

nullndr commented Sep 4, 2024

The jobs are not delayed ones, so they should be in the wait list.

are all the expected workers actually online?

Is there a simple way I can check this? Also what is the reason for which they can go offline?

@manast
Copy link
Contributor

manast commented Sep 4, 2024

The jobs are not delayed ones, so they should be in the wait list.

Could you please verify that this is the case, as "should" here seems to imply you do not know for sure...

Is there a simple way I can check this? Also what is the reason for which they can go offline?

You can use Taskforce.sh or any other frontend to check which workers are online for a given queue.

@nullndr
Copy link
Contributor Author

nullndr commented Sep 9, 2024

Could you please verify that this is the case, as "should" here seems to imply you do not know for sure...

Yeah, as soon we will face the same issue I will check it

You can use Taskforce.sh or any other frontend to check which workers are online for a given queue.

Sadly I use AWS MemoryDB and it is not possible to connect it outside AWS resources, I am thinking about writing a little script that use Queue.getWorkers(), is this the correct way to check this?

Also, I took a look again at the docs and found out about the listener for the error event:

image

I will add it and check again.

@nullndr
Copy link
Contributor Author

nullndr commented Sep 10, 2024

I can confirm they are in the waiting status

@roggervalf
Copy link
Collaborator

hey @nullndr could you connection to your redis instances and executed monitor, let us know which commands are executing while waiting jobs are not processed

@nullndr
Copy link
Contributor Author

nullndr commented Sep 10, 2024

@roggervalf I will try, in the meantime I downgraded to 5.1.12, I'll test all versions to bisect the exact commit.

@manast
Copy link
Contributor

manast commented Sep 10, 2024

You can use Taskforce.sh or any other frontend to check which workers are online for a given queue.

Sadly I use AWS MemoryDB and it is not possible to connect it outside AWS resources, I am thinking about writing a little script that use Queue.getWorkers(), is this the correct way to check this?

This scenario is very common, if you Redis instance is isolated you should use the Taskforce connector: https://github.com/taskforcesh/taskforce-connector

You can use getWorkers as you mention to get the list of online workers.

@manast
Copy link
Contributor

manast commented Sep 10, 2024

I can confirm they are in the waiting status

And the workers are idling and online?

@nullndr
Copy link
Contributor Author

nullndr commented Sep 11, 2024

I have been able to connect my AWS memorydb to taskforce.sh, but the dashboard shows no workers in any queue.

I think this is because I have missing the name option in WorkersOptions since I had to downgrade bullmq to 5.1.12, I will try minor upgrades until I found the issue.

@manast
Copy link
Contributor

manast commented Sep 11, 2024

It could also be that MemoryDB does not implement this command: https://redis.io/docs/latest/commands/client-setname/ but I could not find in the documentation of MemoryDB that this is not the case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants