c# - .Net RabbitMQ client Subscriber.Next hangs -


i using rabbitmq .net client in windows service. have millions of messages coming in bulk processed , output put on queue. creating connection factory heartbeat of 30 , creating connection whenever connection or subscriber lost. in production, code probably works in cases. however, in integration tests, know failing of time. here code:

public void receiveall(func<idictionary<ulong, byte[]>, ionstreamwatchresult> onreceiveallcallback, int batchsize, cancellationtoken cancellationtoken) {     imodel channel = null;     subscription subscription = null;      while (!cancellationtoken.iscancellationrequested)     {         if (subscription == null || subscription.model.isclosed)         {             channel = _channelfactory.createchannel(ref _connection, _messagequeueconfig, _connectionfactory);             // instructs channel not prefetch more batch count shared queue             channel.basicqos(0, convert.touint16(batchsize), false);             subscription = new subscription(channel, _messagequeueconfig.queue, false);         }          try         {             basicdelivereventargs message;             var dequeuedmessages = new dictionary<ulong, byte[]>();                         {                 if (subscription.next(_messagequeueconfig.dequeuetimeout.milliseconds, out message))                 {                     if (message == null)                     {                         // means channel closed , messages in shared queue moved ready state                         disposechannelandsubcription(ref channel, ref subscription);                         receiveall(onreceiveallcallback, batchsize, cancellationtoken);                     }                     else                     {                         dequeuedmessages.add(message.deliverytag, message.body);                     }                 }             } while (message != null && batchsize > dequeuedmessages.count && !cancellationtoken.iscancellationrequested);              if (cancellationtoken.iscancellationrequested)             {                 if (dequeuedmessages.any())                 {                     nackunprocessedmessages(subscription, dequeuedmessages.keys);                 }                 disposechannelandsubcription(ref channel, ref subscription);                 dequeuedmessages.clear();                 break;             }              try             {                 var onstreamwatchresult = onreceiveallcallback(dequeuedmessages);                 ackprocessedmessages(subscription, onstreamwatchresult.processed);                 nackunprocessedmessages(subscription, onstreamwatchresult.unprocessed);                 dequeuedmessages.clear();             }             catch(exception unhandledexception)             {                 nackunprocessedmessages(subscription, dequeuedmessages.keys);             }         }         catch (endofstreamexception endofstreamexception)         {             disposechannelandsubcription(ref channel, ref subscription);         }         catch (operationinterruptedexception operationinterruptedexception)         {             disposechannelandsubcription(ref channel, ref subscription);         }     } } 

the batch size set 4 because put 4 messages in integration test, windows service ran after running unit tests.

the issue here always, subscriber pre-fetches 4 messages, expected, returns true first 2 .next iterations, after returns false. believe happening because messages not getting unacked properly. in integration test, ack 2 , nack 2 messages , read 2 nacked messages again clear queue. however, after nacking, messages not returned ready state , hence test hangs. doing wrong here? not understanding nacking documentation? here nacking code:

subscription.model.basicnack(deliverytag, false, true); 


Comments

Popular posts from this blog

javascript - Karma not able to start PhantomJS on Windows - Error: spawn UNKNOWN -

Nuget pack csproj using nuspec -

c# - Display ASPX Popup control in RowDeleteing Event (ASPX Gridview) -