c# - .Net RabbitMQ client Subscriber.Next hangs -
i using rabbitmq .net client in windows service. have millions of messages coming in bulk processed , output put on queue. creating connection factory heartbeat of 30 , creating connection whenever connection or subscriber lost. in production, code probably works in cases. however, in integration tests, know failing of time. here code:
public void receiveall(func<idictionary<ulong, byte[]>, ionstreamwatchresult> onreceiveallcallback, int batchsize, cancellationtoken cancellationtoken) { imodel channel = null; subscription subscription = null; while (!cancellationtoken.iscancellationrequested) { if (subscription == null || subscription.model.isclosed) { channel = _channelfactory.createchannel(ref _connection, _messagequeueconfig, _connectionfactory); // instructs channel not prefetch more batch count shared queue channel.basicqos(0, convert.touint16(batchsize), false); subscription = new subscription(channel, _messagequeueconfig.queue, false); } try { basicdelivereventargs message; var dequeuedmessages = new dictionary<ulong, byte[]>(); { if (subscription.next(_messagequeueconfig.dequeuetimeout.milliseconds, out message)) { if (message == null) { // means channel closed , messages in shared queue moved ready state disposechannelandsubcription(ref channel, ref subscription); receiveall(onreceiveallcallback, batchsize, cancellationtoken); } else { dequeuedmessages.add(message.deliverytag, message.body); } } } while (message != null && batchsize > dequeuedmessages.count && !cancellationtoken.iscancellationrequested); if (cancellationtoken.iscancellationrequested) { if (dequeuedmessages.any()) { nackunprocessedmessages(subscription, dequeuedmessages.keys); } disposechannelandsubcription(ref channel, ref subscription); dequeuedmessages.clear(); break; } try { var onstreamwatchresult = onreceiveallcallback(dequeuedmessages); ackprocessedmessages(subscription, onstreamwatchresult.processed); nackunprocessedmessages(subscription, onstreamwatchresult.unprocessed); dequeuedmessages.clear(); } catch(exception unhandledexception) { nackunprocessedmessages(subscription, dequeuedmessages.keys); } } catch (endofstreamexception endofstreamexception) { disposechannelandsubcription(ref channel, ref subscription); } catch (operationinterruptedexception operationinterruptedexception) { disposechannelandsubcription(ref channel, ref subscription); } } }
the batch size set 4 because put 4 messages in integration test, windows service ran after running unit tests.
the issue here always, subscriber pre-fetches 4 messages, expected, returns true first 2 .next iterations, after returns false. believe happening because messages not getting unacked properly. in integration test, ack 2 , nack 2 messages , read 2 nacked messages again clear queue. however, after nacking, messages not returned ready state , hence test hangs. doing wrong here? not understanding nacking documentation? here nacking code:
subscription.model.basicnack(deliverytag, false, true);
Comments
Post a Comment