diff --git a/fhem/contrib/PRESENCE/collectord b/fhem/contrib/PRESENCE/collectord index cd58c2348..c22c62e61 100755 --- a/fhem/contrib/PRESENCE/collectord +++ b/fhem/contrib/PRESENCE/collectord @@ -52,7 +52,7 @@ my $buf; my $opt_d; my $opt_h; -my $opt_v; +my $opt_v = 0; my $opt_p = 5222; my $opt_P = "/var/run/".basename($0).".pid"; my $opt_l; @@ -73,7 +73,7 @@ my $uuid; Getopt::Long::Configure('bundling'); GetOptions( "d" => \$opt_d, "daemon" => \$opt_d, - "v" => \$opt_v, "verbose" => \$opt_v, + "v+" => \$opt_v, "verbose+" => \$opt_v, "l=s" => \$opt_l, "logfile=s" => \$opt_l, "c=s" => \$opt_c, "configfile=s" => \$opt_c, "p=i" => \$opt_p, "port=i" => \$opt_p, @@ -110,7 +110,7 @@ sub print_usage () { print " -d, --daemon\n"; print " detach from terminal and run as background daemon\n"; print " -v, --verbose\n"; - print " Print detailed log output\n"; + print " Print detailed log output (can be used multiple times to increase the loglevel, max. 2 times)\n"; print " -l, --logfile \n"; print " log to the given logfile\n"; print " -h, --help\n"; @@ -225,11 +225,10 @@ while(1) foreach $uuid (keys %state) { my %handle_to_socket = reverse %socket_to_handle; - unless(defined($handle_to_socket{$uuid})) + unless(exists($handle_to_socket{$uuid})) { - print timestamp()."cleaning up status values (UUID: $uuid)\n" if($opt_v); + print timestamp()."cleaning up status values (UUID: $uuid)\n" if($opt_v >= 2); delete $state{$uuid}; - delete $socket_to_handle{$handle_to_socket{$uuid}}; } } @@ -289,7 +288,7 @@ while(1) while($log_queue->pending) { $logline = $log_queue->dequeue; - print timestamp().$logline if($opt_v); + print timestamp().$logline if($opt_v >= 2); $logline = undef; } @@ -325,7 +324,7 @@ while(1) { # send the acknowledgment back to the sender $client->send("command accepted\n"); - print timestamp()."received new command from ".$client->peerhost().":".$client->peerport()." - $buf\n" if($opt_v); + print timestamp()."received new command from ".$client->peerhost().":".$client->peerport()." - $buf\n" if($opt_v >= 2); # Split the message into bluetooth address and the timeout value # (timeout is ignored within the collectord, as it is given by configuration) @@ -343,9 +342,8 @@ while(1) my $temp = $handle{$uuid}{threads}; foreach $room (keys %$temp) { - print timestamp()."killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost()."\n" if($opt_v); + print timestamp()."killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost()."\n" if($opt_v >= 2); $handle{$uuid}{threads}{$room}->kill('TERM'); - delete($handle{$uuid}{threads}{$room}); } # when all threads are signaled, delete all relationship entry for this client @@ -356,7 +354,7 @@ while(1) if(not defined($socket_to_handle{$client})) { $socket_to_handle{$client} = generateUUID(); - print timestamp()."generating new UUID for client ".$client->peerhost()." - ".$socket_to_handle{$client}."\n" if($opt_v); + print timestamp()."generating new UUID for client ".$client->peerhost()." - ".$socket_to_handle{$client}."\n" if($opt_v >= 2); } $uuid = $socket_to_handle{$client}; @@ -373,7 +371,7 @@ while(1) { my $new_thread = threads->new(\&doQuery, ($value, $room, $address, $uuid)); print timestamp()."created thread ".$new_thread->tid()." for processing device $address in room $room for peer ".$client->peerhost()." (UUID: $uuid)\n" if($opt_v); - + sleep 1; # detach from the thread, so the thread starts processing independantly $new_thread->detach(); @@ -381,6 +379,31 @@ while(1) $handle{$uuid}{threads}{$room} = $new_thread; } } + elsif(lc($buf) =~ /^\s*now\s*$/) # if a now command is received, all threads need to be signaled to send a now command to the presenced server + { + print timestamp()."received now command from client ".$client->peerhost()."\n" if($opt_v >= 2); + + # just to be sure if the client has really a running request + if(defined($socket_to_handle{$client})) + { + $uuid = $socket_to_handle{$client}; + # get all threads for this socket and send them a termination signal + my $temp = $handle{$uuid}{threads}; + foreach $room (keys %$temp) + { + print timestamp()."signalling thread ".$handle{$uuid}{threads}{$room}->tid()." to send \"now\"-request for room $room for client ".$client->peerhost()."\n" if($opt_v >= 2); + $handle{$uuid}{threads}{$room}->kill('HUP'); + } + + $client->send("command accepted\n"); + + } + else + { + # if there is no command running, just tell the client he's wrong + $client->send("no command running\n"); + } + } elsif(lc($buf) =~ /^\s*stop\s*$/) # if a stop command is received, the running request threads must be stopped { print timestamp()."received stop command from client ".$client->peerhost()."\n" if($opt_v); @@ -393,7 +416,7 @@ while(1) my $temp = $handle{$uuid}{threads}; foreach $room (keys %$temp) { - print timestamp()."killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost()."\n" if($opt_v); + print timestamp()."killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost()."\n" if($opt_v >= 2); $handle{$uuid}{threads}{$room}->kill('TERM'); delete($handle{$uuid}{threads}{$room}); } @@ -401,6 +424,9 @@ while(1) # when all threads are signaled, delete all relationship entry for this client delete($handle{$uuid}); delete($socket_to_handle{$client}); + + $client->send("command accepted\n"); + } else { @@ -408,6 +434,7 @@ while(1) $client->send("no command running\n"); } } + else { # if the message does not match a regular command or a stop signal, just tell the client and make a entry for logging. $client->send("command rejected\n"); @@ -430,7 +457,7 @@ while(1) my $temp = $handle{$uuid}{threads}; foreach $room (keys %$temp) { - print timestamp()."killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost()."\n" if($opt_v); + print timestamp()."killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost()."\n" if($opt_v >= 2); $handle{$uuid}{threads}{$room}->kill('TERM'); delete($handle{$uuid}{threads}{$room}); } @@ -509,6 +536,8 @@ sub daemonize sub doQuery($$$) { + + my ($do_config, $do_room, $do_address, $do_uuid) = @_; my $return; my $socket; @@ -516,28 +545,38 @@ sub doQuery($$$) my $selector; my @client_handle; my $reconnect_count = 0; + my $client_socket = undef; + # if the thread gets a termination signal, the thread must be shutdown by itself + local $SIG{TERM} = sub { + $log_queue->enqueue("$room (".threads->tid()."): terminating thread ".threads->tid()." for $address\n"); + $client_socket->send("stop\n") if($client_socket); + sleep 2; + $selector->remove($client_socket) if(defined($selector)); + close($client_socket) if($client_socket); + $client_socket = undef; + $log_queue->enqueue("$room (".threads->tid()."): exit thread ".threads->tid()."\n"); + threads->exit(); + }; + + local $SIG{HUP} = sub { + $client_socket->send("now\n") if($client_socket); + }; + + my $last_contact = gettimeofday(); my $previous_state = "absence"; my $current_state = "absence"; - my $client_socket = new IO::Socket::INET ( + $client_socket = new IO::Socket::INET ( PeerHost => $values{address}, PeerPort => $values{port}, Proto => 'tcp', Type => SOCK_STREAM, KeepAlive => 1, Blocking => 1 - ) or ( $log_queue->enqueue("$room: could not create socket to ".$values{address}." - $! - \n")); - - # if the thread gets a termination signal, the thread must be shutdown by itself - local $SIG{TERM} = sub { - $log_queue->enqueue("$room: terminating thread ".threads->tid()." for $address\n"); - $client_socket->send("stop\n") if($client_socket); - close($client_socket) if($client_socket); - threads->exit(); - }; + ) or ( $log_queue->enqueue("$room (Thread No. ".threads->tid().") : could not create socket to ".$values{address}." - $! - \n")); $selector = IO::Select->new($client_socket); @@ -561,7 +600,7 @@ sub doQuery($$$) if(defined($client_socket) and not $last_contact > (gettimeofday() - ($current_state eq "absence" ? $values{absence_timeout} : $values{presence_timeout}) - 60)) { - $log_queue->enqueue("$room: socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket\n"); + $log_queue->enqueue("$room (Thread No. ".threads->tid().") socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket (last contact: ".strftime("%Y-%m-%d %H:%M:%S", localtime($last_contact)).")\n"); $selector->remove($client_socket); close($client_socket); @@ -578,7 +617,7 @@ sub doQuery($$$) $status_queue->enqueue("$do_uuid;$room;socket_closed"); # create a log message - $log_queue->enqueue("$room: socket to ".$values{address}.":".$values{port}." for device $do_address closed. Trying to reconnect...\n"); + $log_queue->enqueue("$room (Thread No. ".threads->tid().") socket to ".$values{address}.":".$values{port}." for device $do_address closed. Trying to reconnect...\n"); } # now try to re-establish the connection @@ -594,7 +633,7 @@ sub doQuery($$$) if($client_socket) { # give a success message - $log_queue->enqueue("$room: reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)\n"); + $log_queue->enqueue("$room (Thread No. ".threads->tid().") reconnected to ".$values{address}.":".$values{port}." after $reconnect_count tries for device $do_address (UUID: $do_uuid)\n"); $status_queue->enqueue("$do_uuid;$room;socket_reconnected"); # reset the reconnect counter @@ -638,11 +677,11 @@ sub doQuery($$$) if($return =~ /command accepted/) { # log this to the thread log queue - $log_queue->enqueue("$room: accepted command for $do_address\n"); + $log_queue->enqueue("$room (Thread No. ".threads->tid().") accepted command for $do_address\n"); } elsif($return =~ /command rejected/) # if the message is "command rejected" also log it to the log queue { - $log_queue->enqueue("$room: REJECTED command for $do_address\n"); + $log_queue->enqueue("$room (Thread No. ".threads->tid().") REJECTED command for $do_address\n"); } else # else its a status message { @@ -653,7 +692,7 @@ sub doQuery($$$) if(defined($previous_state) and $previous_state eq "present" and lc($return) =~ /^absence/) { # log the timout change to the log queue - $log_queue->enqueue("$room: changing to absence timeout (".$values{absence_timeout}.") for device $do_address\n"); + $log_queue->enqueue("$room (Thread No. ".threads->tid().") changing to absence timeout (".$values{absence_timeout}.") for device $do_address\n"); $current_state = "absence"; @@ -662,7 +701,7 @@ sub doQuery($$$) } elsif(defined($previous_state) and $previous_state eq "absence" and lc($return) =~ /^present/) { - $log_queue->enqueue("$room: changing to presence timeout (".$values{presence_timeout}.") for device $do_address\n"); + $log_queue->enqueue("$room (Thread No. ".threads->tid().") changing to presence timeout (".$values{presence_timeout}.") for device $do_address\n"); $current_state = "present"; @@ -680,7 +719,6 @@ sub doQuery($$$) else # the socket is EOF which means the connection was closed { - # TODO: reconnect mechanism $selector->remove($local_client); close($local_client); $client_socket = undef;