From 17417668b7e24d15fe45191609f9892be8bfb66a Mon Sep 17 00:00:00 2001 From: borisneubert <> Date: Tue, 7 Apr 2015 17:24:04 +0000 Subject: [PATCH] SubProcess.pm: added two-way communication between parent and child with simple transport protocol git-svn-id: https://svn.fhem.de/fhem/trunk/fhem@8393 2b470e98-0d58-463d-a4d8-8e2adae1ed80 --- FHEM/SubProcess.pm | 150 +++++++++++++++++++--- contrib/SubProcess/98_SubProcessTester.pm | 47 ++++++- 2 files changed, 171 insertions(+), 26 deletions(-) diff --git a/FHEM/SubProcess.pm b/FHEM/SubProcess.pm index 075c4b93e..dbce96c2e 100644 --- a/FHEM/SubProcess.pm +++ b/FHEM/SubProcess.pm @@ -30,15 +30,28 @@ use POSIX ":sys_wait_h"; use Socket; use IO::Handle; +# # creates a new subprocess +# sub new() { my ($class, $args)= @_; my ($child, $parent); - socketpair($child, $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC) || return undef; # die "socketpair: $!"; + # http://perldoc.perl.org/functions/socketpair.html + # man 2 socket + # AF_UNIX Local communication + # SOCK_STREAM Provides sequenced, reliable, two-way, connection-based + # byte streams. An out-of-band data transmission mechanism + # may be supported + # + socketpair($child, $parent, AF_UNIX, SOCK_STREAM || SOCK_NONBLOCK, PF_UNSPEC) || + return undef; # die "socketpair: $!"; $child->autoflush(1); $parent->autoflush(1); + my %childBuffer= (); + my %parentBuffer= (); + my $self= { onRun => $args->{onRun}, @@ -46,6 +59,9 @@ sub new() { timeout => $args->{timeout}, child => $child, parent => $parent, + pid => undef, + childBufferRef => \%childBuffer, + parentBufferRef => \%parentBuffer, }; # we are a hash reference @@ -53,14 +69,19 @@ sub new() { } +# +# returns the pid of the subprocess +# undef if subprocess not available +# sub pid() { my $self= shift; return $self->{pid}; } - -# check if child process is still running +# +# return 1 if subprocess is still running, else 0 +# sub running() { my $self= shift; @@ -69,7 +90,9 @@ sub running() { return waitpid($pid, WNOHANG) > 0 ? 1 : 0; } -# waits for the child process to terminate +# +# waits for the subprocess to terminate +# sub wait() { my $self= shift; @@ -82,6 +105,8 @@ sub wait() { } # +# send a POSIX signal to the subproess +# sub signal() { my ($self, $signal)= @_; @@ -90,41 +115,132 @@ sub signal() { return kill $signal, $pid; } -# terminates a child process (HUP) +# +# terminates thr subprocess (HUP) +# sub terminate() { my $self= shift; return $self->signal('HUP'); } -# terminates a child process (KILL) +# +# kills the subprocess (KILL) +# sub kill() { my $self= shift; return $self->signal('KILL'); } +# +# the socket used by the parent to communicate with the subprocess +# sub child() { my $self= shift; return $self->{child}; } +# +# the socket used by the subprocess to communicate with the parent +# sub parent() { my $self= shift; return $self->{parent}; } -# this function is called from the parent to read from the child -# returns undef on error or if nothing was read -sub read() { - - my $self= shift; - my ($bytes, $result); - $bytes= sysread($self->child(), $result, 1024*1024); - return defined($bytes) ? $result : undef; +# this is a helper function for reading +sub readFrom() { + my ($self, $fh, $bufferRef)= @_; + my %buffer= %{$bufferRef}; + + my $rin= ''; + vec($rin, fileno($fh), 1)= 1; + return undef unless select($rin, undef, undef, 0.001); + my $result= undef; + my $data; + my $bytes= sysread($fh, $data, 1024); + return undef unless(defined($bytes) && $bytes); + #main::Debug "SUBPROCESS: read \"$data\""; + + # prepend buffer if buffer is set + $data= $buffer{data} . $data if(defined($buffer{data})); + my $len= length($data); + #main::Debug "SUBPROCESS: data is now \"$data\" (length: $len)"; + # get or set size (32bit unsigned integer in network byte order) + my $size= defined($buffer{size}) ? $buffer{size} : undef; + if(!defined($size) && $len>= 4) { + $size= unpack("N", $data); + $data= substr($data, 4); + $len-= 4; + #main::Debug "SUBPROCESS: got size: $size"; + } + # get the datagram if size is set and data length is at least size + if(defined($size) && $len>= $size) { + $result= substr($data, 0, $size); + $size= undef; + #main::Debug "SUBPROCESS: data complete: \"$data\""; + } + # set buffer + $buffer{data}= $data; + $buffer{size}= $size; + # return result + return $result; } -# starts the child process +# this is a helper function for writing +sub writeTo() { + my ($self, $fh, $msg)= @_; + my $win= ''; + vec($win, fileno($fh), 1)= 1; + return undef unless select(undef, $win, undef, 0.001); + my $size= pack("N", length($msg)); + my $bytes= syswrite($fh, $size . $msg); + return $bytes; +} + + + +# this function is called from the parent to read from the subprocess +# returns undef on error or if nothing was read +sub readFromChild() { + + my $self= shift; + + return $self->readFrom($self->child(), $self->{childBufferRef}); +} + + +# this function is called from the parent to write to the subprocess +# returns 0 on error, else 1 +sub writeToChild() { + + my ($self, $msg)= @_; + return $self->writeTo($self->child(), $msg); +} + + +# this function is called from the subprocess to read from the parent +# returns undef on error or if nothing was read +sub readFromParent() { + + my $self= shift; + return $self->readFrom($self->parent(), $self->{parentBufferRef}); +} + + +# this function is called from the subprocess to write to the parent +# returns 0 on error, else 1 +sub writeToParent() { + + my ($self, $msg)= @_; + return $self->writeTo($self->parent(), $msg); +} + + +# +# starts the subprocess +# sub run() { my $self= shift; @@ -139,8 +255,6 @@ sub run() { if(!$pid) { # CHILD - #close(CHILD); - #main::Debug "PARENT FD= " . fileno $self->{parent}; # run my $onRun= $self->{onRun}; @@ -161,8 +275,6 @@ sub run() { } else { # PARENT - #close(PARENT); - #main::Debug "CHILD FD= " . fileno $self->{child}; main::Log3 $pid, 5, "SubProcess $pid created."; diff --git a/contrib/SubProcess/98_SubProcessTester.pm b/contrib/SubProcess/98_SubProcessTester.pm index cae9638d6..63021e957 100644 --- a/contrib/SubProcess/98_SubProcessTester.pm +++ b/contrib/SubProcess/98_SubProcessTester.pm @@ -52,29 +52,38 @@ SubProcessTester_Initialize($) { $hash->{ShutdownFn} = "SubProcessTester_Shutdown"; #$hash->{ReadyFn} = "SubProcessTester_Ready"; #$hash->{GetFn} = "SubProcessTester_Get"; - #$hash->{SetFn} = "SubProcessTester_Set"; + $hash->{SetFn} = "SubProcessTester_Set"; #$hash->{AttrFn} = "SubProcessTester_Attr"; #$hash->{AttrList}= ""; } ##################################### # -# Functions called from sub process +# Functions called from subprocess # ##################################### sub onRun($) { my $subprocess= shift; my $parent= $subprocess->parent(); - Log3 undef, 1, "RUN RUN RUN RUN..."; + Log3 undef, 1, "SUBPROCESS: Running..."; my $foobar= $subprocess->{foobar}; - for(my $i= 0; $i< 10; $i++) { + for(my $i= 0; $i< 30; $i++) { + + + my $msg= $subprocess->readFromParent(); + if(defined($msg)) { + Log3 undef, 1, "SUBPROCESS read from parent: $msg"; + $subprocess->writeToParent("echo: $msg"); + } #Log3 undef, 1, "Step $i"; # here we write something to the parent process # this is received via the global select loop # and evaluated in the ReadFn. - print $parent "$foobar $i\n"; - $parent->flush(); + $subprocess->writeToParent("$foobar $i\n"); + #print $parent "$foobar $i\n"; + #$parent->flush(); + # has the pa sleep 5; } } @@ -117,6 +126,26 @@ sub SubProcessTester_Shutdown($$) { return undef; } +sub SubProcessTester_Set() { + + my ($hash, @a)= @_; + + my $name= $hash->{NAME}; + my $cmdname= $a[1]; + my $value= $a[2]; + if($cmdname eq "send") { + my $subprocess= $hash->{fhem}{subprocess}; + Log3 $hash, 5, "Before send...."; + $subprocess->writeToChild($value); + Log3 $hash, 5, "After send...."; + return undef; + } else { + return "Unknown argument $cmdname, choose one of send"; + } + + +} + ##################################### sub SubProcessTester_DoInit($) { @@ -126,7 +155,11 @@ sub SubProcessTester_DoInit($) { $hash->{fhem}{subprocess}= undef; my $subprocess= SubProcess->new( { onRun => \&onRun, onExit => \&onExit } ); + # you can set your own variables like this: $subprocess->{foobar}= "foo / bar"; + # remember: as soon as the subprocess is started, parent and child process live + # in separate processes and cannot share data anymore - changing variables in + # the parent does not affect variables in the child and vice versa. my $pid= $subprocess->run(); return unless($pid); @@ -175,7 +208,7 @@ sub SubProcessTester_Read($) { # here we read from the global select loop what was # written in the onRun function - my $result= $subprocess->read(); + my $result= $subprocess->readFromChild(); if(defined($result)) { chomp $result; readingsSingleUpdate($hash, "step", $result, 1);