threads - threads not finished as expected

Posted on Tue Dec 2 14:59:55 2008 by hongwu
threads not finished as expected
hello, i wrote a program which has two threads. when it runs for a few minutes, every thing is ok, two threads could exit properly when i "ctl-c". (the unsubscribe function:
$aReceiver->unsubscribe({ destination => "/queue/$queue2"}); $sReceiver->unsubscribe({ destination => "/queue/$queue1"});
in two threads could be executed without problem) but when it runs for a long time, one hour, for example, when i "ctl-c", the
$aReceiver->unsubscribe({ destination => "/queue/$queue2"}); $sReceiver->unsubscribe({ destination => "/queue/$queue1"});
could not be executed. I do not know what is the problem, could you please analyze it? thanks! here goes the program: --------------------------------------------------------------------
#!/usr/bin/perl -w use strict; use Switch; use Time::HiRes qw(usleep ualarm gettimeofday tv_interval); use Net::Stomp; use Term::ANSIColor; #for multithread use threads; use threads::shared; #for ActiveMQ connection my $aq_host = "10.255.1.130"; my $aq_port = '61613'; my $sSender; my $sReceiver; my $aReceiver; my $queue1 = "queue1"; my $queue2 = "queue2"; my $running : shared = 1; my $thread_th1; my $thread_th2; sub cleanup() { print colored ("\n.STOP received! \n", 'red'); $running=0; print ".Stopping Thread1 ...... "; my $r1 = $thread_th1->join(); print colored ("$r1\n", 'green'); print ".Stopping Thread2 ...... "; my $r2 = $thread_th2->join(); print colored ("$r2\n", 'green'); print ".Disconnecting ActiveMQ connector ...... "; $sReceiver->disconnect(); $aReceiver->disconnect(); print colored ("OK\n", 'green'); print "\n"; print colored ("ALL Nicely Stopped, see you!\n\n", 'yellow'); } sub th1() { $sReceiver->subscribe({ destination => "/queue/$queue1", 'ack' => 'client', 'activemq.prefetchSize' => 1}); while($running){ if ($sReceiver->can_read({timeout=>'1'})) { my $frame = $sReceiver->receive_frame; my $body = $frame->body; #print "get message from queue 1: $body \n"; ptMsg($body); #Ack msg for deleting after the procedure $sReceiver->ack({ frame => $frame}); } } $sReceiver->unsubscribe({ destination => "/queue/$queue1"}); return "OK"; } sub th2() { $aReceiver->subscribe({ destination => "/queue/$queue2", 'ack' => 'client', 'activemq.prefetchSize' => 1}); while($running) { #get one message from queue if ($aReceiver->can_read({timeout=>'1'})) { my $frame = $aReceiver->receive_frame; my $body = $frame->body; #print "get message from queue 2: $body"; ptMsg($body); $aReceiver->ack({ frame => $frame}); } } #when received cleanup message $aReceiver->unsubscribe({ destination => "/queue/$queue2"}); return "OK"; } sub ptMsg { my $msg = shift; print "received message $msg \n"; } ################################################################# # main function start ################################################################# print ".Creating ActiveMQ connector ...... "; $sReceiver = Net::Stomp->new({ hostname => $aq_host, port => $aq_port}); $sReceiver->connect(); $aReceiver = Net::Stomp->new({ hostname => $aq_host, port => $aq_port}); $aReceiver->connect(); print colored ("OK\n",'green'); $SIG{INT} = "cleanup"; print ".Starting Thread1 ...... "; $thread_th1 = threads->create(\&th1); print colored ("OK\n", 'green'); print ".Starting Thread2 ...... "; $thread_th2 = threads->create(\&th2); print colored ("OK\n", 'green'); #main process waiting for signal clean print colored ("->running ......\n", 'green'); print "\n"; while ($running) { usleep(5000000); }
Direct Responses: 9479 | Write a response