Testing Event-Based Workflows Using Node.js and RabbitMQ

I work on a system that was built in the microservices architectural style. It uses RabbitMQ for asynchronous messaging among services. The idea has been very successful. Here are some of the benefits.

  1. We can ranch our services using Kubernetes and scale out when needed.
  2. We need not fear that a message will be lost if a service receives it and dies before processing it. RabbitMQ handles the delivery of messages and retries them as needed.
  3. The system handles partial failure like a pro.
  4. The metrics we get from RabbitMQ helps us understand how the system is working. We can tell whether a worker is stuck by looking at the queue length, for example.
  5. Ease of testing...

I’m kidding, testing doesn’t get easier. It doesn’t get harder too. It’s similar to how you’d test systems that communicate in any other way. You can use the following techniques when testing a system that uses RabbitMQ (the techniques would work for Redis Streams/PubSub too).

Depending on the level of testing you want, your tests will be more or less integrated as you trade-off speed, setup costs, and confidence to various degrees.

Half integration with message schemas

Just as with unit testing, you can test your messages without involving RabbitMQ itself. Here you rely on your messages being values that you can inspect in a single process.

First, delegate interaction with the messaging system to a separate unit of your application. Then, in your tests, pass in a fake instance of that unit through which you can assert that the system under test produced a message having the schema you expect and addressed it to the correct queue/topic.

This unit that interacts with RabbitMQ can be tested and verified separately.

Pros

  1. You do not need to set up the entire messaging system to run the tests.
  2. The tests will run faster because they’re all in memory in a single process.
  3. Your CI bill will be less, the CFO will love you for it.

Cons

  1. You have only the confidence that you produce the messages you intend; you don’t know how the consumer receives it.
  2. You may need a schema registry to share schemas across services.
  3. Schema changes are caught late, so a consumer may break and not be noticed sooner.
  4. You need complementary tests – one on the producer service and another on the consumer service – to fully test a workflow. This splits the tests for a workflow across the services, which can be a bit more tedious.

Full integration

Sometimes you’d prefer full integration with RabbitMQ in your tests, just as you’d choose integration tests over unit tests. It can be for any reason:

  1. your code base is already designed that way;
  2. you want more confidence that dependent services and workers get along fine;
  3. you want to understand the tests for a workflow in one place; or
  4. you want to make your CFO sweat a bit.

These are all valid reasons for full integration, depending on who you ask.

In a full integration setup, you need to ensure that every service involved in a workflow is designed to cooperate in testing. The consumers and the message broker are run as part of the test setup. Then the producer’s tests are run. The tests queue messages as necessary and may listen on actions taken by the consumers. In this sense, the consumers are also tested, but the tests are usually written on the producer side.

An alternative setup is to run the producer, the consumer, and the message broker as part of the setup. That is, bring up everything as a normal system. Then the tests are written in a separate project to run against the system. I prefer this approach for acceptance tests because it presents the tests from the perspective of an external actor and reduces the chances of depending on internals of the program.

A downside of full integration is that your tests now have another reason to fail. You may have to write extra code to work with RabbitMQ in tests. I’ll show an example of that in this article.

Full integration example

As an example, I’ll show snippets of a test suite I worked on. We have wallets for users’ accounts. Each wallet has a tier that determines how much the user can hold or spend. We allow only tier-3 users to change their daily limits. Here’s a test suite for that.

describe('Customer account tier operations', () => {
  let sequencer: AMQPMessageSequencer;

  beforeAll(async () => {
    sequencer = new AMQPMessageSequencer({
      eventName: TestEvents.ACCOUNT_TIER_UPGRADE_PROCESSED,
      exchange: Exchanges.test,
      queueName: TestEvents.ACCOUNT_TIER_UPGRADE_PROCESSED,
      forwardedEventName: (msg: ConsumeMessage) =>
        JSON.parse(msg.content.toString()).wallet
    });
    await sequencer.start();
  });

  afterAll(() => sequencer.stop());

  test.each(['tier_0', 'tier_1', 'tier_2', 'tier_4'])(
    'changing account tier limits fails for non-tier-3 customers',
    async (tier) => {
      // snip
    }
  );

  test('changing account tier limits succeeds for tier-3 customers', async () => {
    const wallet = await createWallet('tier_3', sequencer);
    // snip
  });
});

I have redacted parts of the test suite that are not relevant to this article. The parts of the snippet in bold are significant. Let’s dig into this snippet, starting from the bottom.

const wallet = await createWallet('tier_3', sequencer);

This part tells us that the test case first creates a tier-3 wallet, then goes on with its business. But what’s it about createWallet and sequencer? Here’s the body of createWallet explaining itself.

async function createWallet(tier: Tier, sequencer: AMQPMessageSequencer) {
  const { userId } = await fakeWallet({ request });

  await sequencer.once(userId, () =>
    publisher.queue(walletUpgradeQueue, {
      id: userId,
      tier
    })
  );

  return getWallet(userId);
}

It creates a fake wallet, uses this sequencer thing to interact with a queue, waits for it to finish (it’s await-ed), then fetches the wallet again. It fetches the wallet again because whatever listens on that queue changes the wallet.

Designing for tests

Let’s look at the worker that listens on walletUpgradeQueue.

export default async function updateWalletTier(message: ConsumeMessage) {
  const { id, tier } = JSON.parse(
    message.content.toString()
  );

  try {
    // snip
    await publishTestEvent(TestEvents.ACCOUNT_TIER_UPGRADE_PROCESSED, {
      wallet: id,
      tier
    });
  } catch (err) {
    // snip
  }
}

updateWalletTier, as you’d expect, updates the wallet tier. Because we want to test against a wallet at tier 3, we send it a message after creating the fake wallet. This helps us bypass the rigorous process of creating a tier-3 wallet on a normal day. But notice what it does at the end. It publishes a test event. As I mentioned earlier, you need to ensure that every service involved in a workflow is designed to cooperate in testing. Just as you use dependency injection to make your classes and functions testable, you can publish test events to make your event-based workflows testable. Here’s the body of publishTestEvent for your viewing pleasure.

export async function publishTestEvent(eventName: string, data: any) {
  process.env.NODE_ENV === Test &&
    (await publisher.emit(Exchanges.test, eventName, data, {
      persistent: false
    }));
}

It publishes a non-persistent event in test environments. It has to be non-persistent so that each message is dropped whether it is acknowledged or not. That prevents later tests from receiving messages from a previous test.

Dealing with a flowing stream of messages

Let’s now deal with the sequencer business. One of our test cases is parametrised.

test.each(['tier_0', 'tier_1', 'tier_2', 'tier_4'])(
  // snip
);

Jest would run the test callback with each parameter. The test callback uses the queue to create accounts at each tier (the parameter), so we’d put four messages on the queue. The problem with this is that there’s only one consumer holding the queue at any time. The first consumer we create when we run with tier_0 will hold on to the queue in the background and read every subsequent message off it. That will stall the tests for the other parameters, causing them to timeout.

To solve this problem we need to consume messages off the queue one-at-a-time. We could create the consumer when each test starts and destroy it once we’re done. But that didn’t work well using the beforeEach setup procedure. I guess Jest sees the entire test.each as a single test case (I may be wrong on this, I was too tired of debugging to verify).

This was how I settled on the sequencer business. That object consumes off the queue continuously and republishes each message as a Node.js event on an internal event emitter. The name of the event is gotten by passing the message to the forwardedEventName parameter. It also provides a method named once that accepts an event name and a callback that will trigger that event. It then listens once for that event while the callback runs. The event name must be the same as whatever the forwardedEventName parameter of the AMQPMessageSequencer class produces. If you follow the test from createWallet to publishTestEvent in updateWalletTier, you’ll see that the user ID we pass to the queue is what ends up as wallet in

new AMQPMessageSequencer({
  // snip
  forwardedEventName: (msg: ConsumeMessage) => JSON.parse(msg.content.toString()).wallet
});

Basically, we use sequencer to read one message at a time so that each test reads only the message it wants. If we queue a message only when we want, and can read only that message off the queue, then the stream of messages won’t clobber our tests. And because each test message is non-persistent, we won’t get stale messages. You can find the source code for AMQPMessageSequencer in the appendix.

Summary

Testing event-based workflows can be done in two styles: half integration or full integration.

Half integration corresponds to unit tests. It is cheap to set up but requires complementary tests to cover the entire workflow. Full integration corresponds to integration tests. It requires more setup, but it catches breaking changes between producers and consumers sooner.

To work with full integration, you need to design your services for testing. Emitting test events in the right places is a good way to start. Publish test events as non-persistent messages so that unacked messages won’t hang around and stalk later tests. When using RabbitMQ and Node.js, you should think of how to consume messages one at a time so that the stream of messages from the broker doesn’t clobber your tests.

I hope this article helps you test your event-based workflows more effectively. You are welcome to comment and start a discussion around it. I appreciate your feedback because I learn from them. If you’ve read this far, I thank you for reading. Until next time.

Appendix

AMQPMessageSequencer source code