Olli Pöyry bio photo

Olli Pöyry

Software Architect, M.Sc. (Accounting and Finance)

Email Twitter Facebook LinkedIn

The code described below is able to provide a fast feedback loop from server back to client even when multiple threads are concurrently sending asynchronous requests.

  • Each asynchronous request will embed a unique identifier that is passed back through a [Redis pub/sub queue](http://redis.io/topics/pubsub)

  • Spring bean ResponseWaitUtil spawns a background thread on the client side with a Redis pub/sub listener

  • java.util.concurrent.CountDownLatch is used to make the client thread wait for the response

  • ResponseWaitUtil bean contains a ConcurrentHashMap that maps each waiting request id to a corresponding CountDownLatch

  • As a response is received through the Redis queue the corresponding latch is freed

@Value("${wait.for.response.sec:30}")
int waitForResponseSec;
@Resource
ResponseWaitUtil responseWaitUtil;


final String requestId = testUtil.getNextBusinessId();
CountDownLatch countDownLatch = responseWaitUtil.addLatchForRequest(requestId);
sampleResult.sampleStart(); // Apache JMeter start timer


// Execute ASYNC call e.g. something like:
executeUpload(uploadRequest, requestId);


LOG.debug("Waiting {} seconds for response from backend on Redis pub/sub channel {}, Redis URL {}...",
waitForResponseSec, ResponseWaitUtil.REDIS_CHANNEL_NAME, responseWaitUtil.getRedisUrl());
if (countDownLatch.await(waitForResponseSec, TimeUnit.SECONDS)) {
sampleResult.sampleEnd(); // Apache JMeter stop timer
...
} else {
sampleResult.setSuccessful(false);
...
}

Before sending an asynchronous request a CountDownLatch is created and added to the map mapRequestIdToLatch. After receiving a response through the queue the JedisPubSub listener defined in ResponseWaitUtil executes countDown() on the latch mapped to the request id received through the queue.

What happens if the feedback loop from server is lightning fast and the latch is freed before the client program gets into countDownLatch.await? No worries - then the program will just continue with no wait.

@Component
public class ResponseWaitUtil {


@Value("${redis.url:localhost}")
String redisUrl;


public static final String REDIS_CHANNEL_NAME = ResponseWaitUtil.class.getName();


Map<String, CountDownLatch> mapRequestIdToLatch = new ConcurrentHashMap<String, CountDownLatch>();


public CountDownLatch addLatchForRequest(String requestId) {
CountDownLatch countDownLatch = new CountDownLatch(1);
mapRequestIdToLatch.put(requestId, countDownLatch);
return countDownLatch;
}


final JedisPubSub jedisPubSub = new JedisPubSub() {


@Override
public void onMessage(String channel, String message) {
if (StringUtils.equals(channel, REDIS_CHANNEL_NAME)) {
String requestId = StringUtils.substringBefore(message, "+");
CountDownLatch countDownLatch = mapRequestIdToLatch.get(requestId);
if (countDownLatch != null) {
LOG.info("Redis pub/sub onMessage got message {}, continue thread", message);
mapRequestIdToLatch.remove(requestId);
if (StringUtils.contains(message, "+")) {
mapRequestIdToMessage.put(requestId, message);
}
countDownLatch.countDown();
} else {
LOG.warn("SYSTEM ERROR latch not found for request id {}, mapRequestIdToLatch keyset {}", message, mapRequestIdToLatch.keySet());
}
}
}
};

The pub/sub listener in ResponseWaitUtil is spawned as a background thread in the client before executing any asynchronous requests.

ExecutorService executor;
Future future;


public synchronized void init() {
if (future == null) {
LOG.debug("Start background thread for getting responses from backend through Redis");
executor = Executors.newFixedThreadPool(1);
future = executor.submit(redisListenerTask);
}
}


public synchronized void destroy() {
if (future != null) {
try {
jedisPubSub.unsubscribe();
executor.shutdownNow();
future = null;
executor = null;
LOG.debug("Background thread for getting responses through Redis closed");
} catch (Exception ex) {
LOG.error("Error in destroying Redis listener", ex);
}
}
}


class RedisListenerTask implements Runnable {


public void run() {
Jedis jedis = null;
try {
jedis = new Jedis(redisUrl);
jedis.subscribe(jedisPubSub, REDIS_CHANNEL_NAME);
} catch (Exception ex) {
LOG.error("Error", ex);
} finally {
if (jedis != null)
jedis.quit();
jedis = null;
}
}
}

The unique id for the asynchronous request is built up using a resource (e.g. Spring bean) with a counter value as part of the returned request id string.

static String receiverBeginning = "FI" + RandomStringUtils.randomNumeric(3);
static String businessIdPostfix = RandomStringUtils.randomNumeric(1);


public String getNextBusinessId() {
int counterValue = businessIdCounter.incrementAndGet();
return receiverBeginning + StringUtils.leftPad(String.valueOf(counterValue), 4, "0") + businessIdPostfix;
}

Request id could as well be generated with something like UUID.randomUUID().toString()

On the server side the service responding to the asynchronous request picks the Redis host name and pub/sub queue name from the message having embedded a string like below somewhere in the request:

<ns2:StreetName>[CALLBACK REDIS:localhost:ResponseWaitUtil:FI68400018]</ns2:StreetName>

Notice that the host name or host and port combination for Redis is provided in the request, thus it does not need to be configured on the server side. Also the name of the pub/sub queue used is embedded in the callback string.

Depending on the service some additional information may be sent to the client by appending a string after a separator character (here +), for instance a database primary key generated in the service.

String restAddressToCallback = StringUtils.substringBetween(requestMessage, "[CALLBACK ", "]");
executeRedisPub(restAddressToCallback + (databaseId != null ? "+" + databaseId : ""));


public void executeRedisPub(String url) throws Exception {
Jedis jedis = null;
try {
String arr[] = StringUtils.split(url, ":");
if (arr.length != 4)
throw new Exception("SYSTEM ERROR, callback URL for Redis should be of format REDIS:host:topic:requestid, erroneous url was" + url);
jedis = new Jedis(arr[1]);
jedis.publish(arr[2], arr[3]);
LOG.debug(">>Publish request id {} to Redis on host {} using topic {}", arr[3], arr[1], arr[2]);
} finally {
if (jedis != null)
jedis.quit();
}
}

JedisPubSub onMessage listener in ResponseWaitUtil adds responses with any extra information to a ConcurrentHashMap where it can be queried. getMessageByRequestId below has a side effect of removing the entry on query. The actual payload (e.g. database row id) of the returned extra information can be extracted with StringUtils.substringAfter(message, "+");.

Map<String, String> mapRequestIdToMessage = new ConcurrentHashMap<String, String>();


// Populate map in JedisPubSub onMessage:
if (StringUtils.contains(message, "+")) {
mapRequestIdToMessage.put(requestId, message);
}


public String getMessageByRequestId(String requestId) {
return mapRequestIdToMessage.remove(requestId);
}

Redis pub/sub queue is faster and easier to configure as a feedback channel than alternatives like JMS or a http connection as the flow is from server side to the client.