############################################## # $Id$ package main; use strict; use warnings; use SetExtensions; sub MQTT2_DEVICE_Initialize($) { my ($hash) = @_; $hash->{Match} = ".*"; $hash->{SetFn} = "MQTT2_DEVICE_Set"; $hash->{GetFn} = "MQTT2_DEVICE_Get"; $hash->{DefFn} = "MQTT2_DEVICE_Define"; $hash->{UndefFn} = "MQTT2_DEVICE_Undef"; $hash->{AttrFn} = "MQTT2_DEVICE_Attr"; $hash->{ParseFn} = "MQTT2_DEVICE_Parse"; $hash->{RenameFn} = "MQTT2_DEVICE_Rename"; no warnings 'qw'; my @attrList = qw( IODev bridgeRegexp:textField-long devicetopic disable:0,1 disabledForIntervals model readingList:textField-long setList:textField-long getList:textField-long ); use warnings 'qw'; $hash->{AttrList} = join(" ", @attrList)." ".$readingFnAttributes; my %h = ( re=>{}, cid=>{}, bridge=>{} ); $modules{MQTT2_DEVICE}{defptr} = \%h; } ############################# sub MQTT2_DEVICE_Define($$) { my ($hash, $def) = @_; my @a = split("[ \t][ \t]*", $def); my $name = shift @a; my $type = shift @a; # always MQTT2_DEVICE $hash->{CID} = shift(@a) if(@a); return "wrong syntax for $name: define MQTT2_DEVICE [clientid]" if(int(@a)); $hash->{DEVICETOPIC} = $name; if($hash->{CID}) { my $dpc = $modules{MQTT2_DEVICE}{defptr}{cid}; if(!$dpc->{$hash->{CID}}) { $dpc->{$hash->{CID}} = []; } push(@{$dpc->{$hash->{CID}}},$hash); } AssignIoPort($hash); return undef; } ############################# sub MQTT2_DEVICE_Parse($$) { my ($iodev, $msg) = @_; my $ioname = $iodev->{NAME}; my %fnd; sub checkForGet($$$) { my ($hash, $key, $value) = @_; if($hash->{asyncGet} && $key eq $hash->{asyncGet}{reading}) { RemoveInternalTimer($hash->{asyncGet}); asyncOutput($hash->{asyncGet}{CL}, "$key $value"); delete($hash->{asyncGet}); } } my $autocreate; if($msg =~ m/^autocreate:(.*)/) { $msg = $1; $autocreate = 1; } my ($cid, $topic, $value) = split(":", $msg, 3); my $dp = $modules{MQTT2_DEVICE}{defptr}{re}; foreach my $re (keys %{$dp}) { my $reAll = $re; $reAll =~ s/\$DEVICETOPIC/\.\*/g; next if(!("$topic:$value" =~ m/^$reAll$/s || "$cid:$topic:$value" =~ m/^$reAll$/s)); foreach my $dev (keys %{$dp->{$re}}) { next if(IsDisabled($dev)); my $hash = $defs{$dev}; my $reRepl = $re; $reRepl =~ s/\$DEVICETOPIC/$hash->{DEVICETOPIC}/g; next if(!("$topic:$value" =~ m/^$reRepl$/s || "$cid:$topic:$value" =~ m/^$reRepl$/s)); my @retData; my $code = $dp->{$re}{$dev}; Log3 $dev, 4, "MQTT2_DEVICE_Parse: $dev $topic => $code"; if($code =~ m/^{.*}$/s) { $code = EvalSpecials($code, ("%TOPIC"=>$topic, "%EVENT"=>$value, "%DEVICETOPIC"=>$hash->{DEVICETOPIC}, "%NAME"=>$hash->{NAME})); my $ret = AnalyzePerlCommand(undef, $code); if($ret && ref $ret eq "HASH") { readingsBeginUpdate($hash); foreach my $k (keys %{$ret}) { readingsBulkUpdate($hash, $k, $ret->{$k}); push(@retData, "$k $ret->{$k}"); checkForGet($hash, $k, $ret->{$k}); } readingsEndUpdate($hash, 1); } } else { readingsSingleUpdate($hash, $code, $value, 1); push(@retData, "$code $value"); checkForGet($hash, $code, $value); } $fnd{$dev} = 1; } } ################################################# # autocreate and/or expand readingList if($autocreate && !%fnd) { return "" if($cid && $cid =~ m/mosqpub.*/); ################## bridge stuff my $newCid = $cid; my $bp = $modules{MQTT2_DEVICE}{defptr}{bridge}; foreach my $re (keys %{$bp}) { next if(!("$topic:$value" =~ m/^$re$/s || "$cid:$topic:$value" =~ m/^$re$/s)); my $cidExpr = $bp->{$re}; $newCid = eval $cidExpr; if($@) { Log 1, "MQTT2_DEVICE: Error evaluating $cidExpr: $@"; return ""; } last; } return if(!$newCid); PrioQueue_add(sub{ my $cidArr = $modules{MQTT2_DEVICE}{defptr}{cid}{$newCid}; return if(!$cidArr); my $add; if($value =~ m/^{.*}$/) { my $ret = json2nameValue($value); $add = "{ json2nameValue(\$EVENT) }" if(keys %{$ret}); } if(!$add) { $topic =~ m,.*/([^/]+),; $add = ($1 ? $1 : $topic); } for my $ch (@{$cidArr}) { my $nn = $ch->{NAME}; my $rl = AttrVal($nn, "readingList", ""); $rl .= "\n" if($rl); my $regexpCid = ($cid eq $newCid ? "$cid:" : ""); CommandAttr(undef, "$nn readingList $rl${regexpCid}$topic:.* $add"); } MQTT2_DEVICE_Parse($iodev, $msg); }, undef); my $cidArr = $modules{MQTT2_DEVICE}{defptr}{cid}{$newCid}; return "UNDEFINED MQTT2_$newCid MQTT2_DEVICE $newCid" if(!$cidArr); return ""; } return keys %fnd; } # compatibility: the first version was implemented as MQTT2_JSON and published. sub MQTT2_JSON($;$) { return json2nameValue($_[0], $_[1]); } sub MQTT2_getCmdHash($) { my ($list) = @_; my (%h, @cmd); map { my ($k,$v) = split(" ",$_,2); push @cmd, $k; $k =~ s/:.*//; # potential arguments $h{$k} = $v; } grep /./, split("\n", $list); return (\%h, join(" ",@cmd)); } ############################# # replace {} and $EVENT. Used both in set and get sub MQTT2_buildCmd($$$) { my ($hash, $a, $cmd) = @_; shift @{$a}; if($cmd =~ m/^{.*}$/) { $cmd = EvalSpecials($cmd, ("%EVENT"=>join(" ",@{$a}), "%NAME"=>$hash->{NAME})); $cmd = AnalyzeCommandChain($hash->{CL}, $cmd); return if(!$cmd); } else { if($cmd =~ m/\$EV/) { # replace EVENT & $EVTPART my $event = join(" ",@{$a}); $cmd =~ s/\$EVENT/$event/g; for(my $i=0; $i<@{$a}; $i++) { my $n = "\\\$EVTPART$i"; $cmd =~ s/$n/$a->[$i]/ge; } } else { shift @{$a}; $cmd .= " ".join(" ",@{$a}) if(@{$a}); } } $cmd =~ s/\$DEVICETOPIC/$hash->{DEVICETOPIC}/g; return $cmd; } ############################# sub MQTT2_DEVICE_Get($@) { my ($hash, @a) = @_; return "Not enough arguments for get" if(!defined($a[1])); my ($gets,$cmdList) = MQTT2_getCmdHash(AttrVal($hash->{NAME}, "getList", "")); return "Unknown argument $a[1], choose one of $cmdList" if(!$gets->{$a[1]}); return undef if(IsDisabled($hash->{NAME})); my ($getReading, $cmd) = split(" ",$gets->{$a[1]},2); if($hash->{CL}) { my $tHash = { hash=>$hash, CL=>$hash->{CL}, reading=>$getReading }; $hash->{asyncGet} = $tHash; InternalTimer(gettimeofday()+4, sub { asyncOutput($tHash->{CL}, "Timeout reading answer for $cmd"); delete($hash->{asyncGet}); }, $tHash, 0); } $cmd = MQTT2_buildCmd($hash, \@a, $cmd); return if(!$cmd); IOWrite($hash, "publish", $cmd); return undef; } ############################# sub MQTT2_DEVICE_Set($@) { my ($hash, @a) = @_; return "Not enough arguments for set" if(!defined($a[1])); my ($sets,$cmdList) = MQTT2_getCmdHash(AttrVal($hash->{NAME}, "setList", "")); my $cmdName = $a[1]; my $cmd = $sets->{$cmdName}; return SetExtensions($hash, $cmdList, @a) if(!$cmd); return undef if(IsDisabled($hash->{NAME})); $cmd = MQTT2_buildCmd($hash, \@a, $cmd); return if(!$cmd); IOWrite($hash, "publish", $cmd); readingsSingleUpdate($hash, "state", $cmdName, 1); return undef; } sub MQTT2_DEVICE_Attr($$) { my ($type, $dev, $attrName, $param) = @_; my $hash = $defs{$dev}; if($attrName eq "devicetopic") { $hash->{DEVICETOPIC} = ($type eq "del" ? $hash->{NAME} : $param); return undef; } if($attrName =~ m/(.*)List/) { my $atype = $1; if($type eq "del") { MQTT2_DEVICE_delReading($dev) if($atype eq "reading"); return undef; } return "$dev attr $attrName: more parameters needed" if(!$param); #90145 foreach my $el (split("\n", $param)) { my ($par1, $par2) = split(" ", $el, 2); next if(!$par1); (undef, $par2) = split(" ", $par2, 2) if($type eq "get"); return "$dev attr $attrName: more parameters needed" if(!$par2); if($atype eq "reading") { if($par2 =~ m/^{.*}$/) { my $ret = perlSyntaxCheck($par2, ("%TOPIC"=>1, "%EVENT"=>"0 1 2 3 4 5 6 7 8 9", "%NAME"=>$dev, "%DEVICETOPIC"=>$hash->{DEVICETOPIC})); return $ret if($ret); } else { return "unsupported character in readingname $par2" if(!goodReadingName($par2)); } } else { my $ret = perlSyntaxCheck($par2, ("%EVENT"=>"0 1 2 3 4 5 6 7 8 9")); return $ret if($ret); } } MQTT2_DEVICE_addReading($dev, $param) if($atype eq "reading"); } if($attrName eq "bridgeRegexp" && $type eq "set") { foreach my $el (split("\n", $param)) { my ($par1, $par2) = split(" ", $el, 2); next if(!$par1); return "$dev attr $attrName: more parameters needed" if(!$par2); eval { "Hallo" =~ m/^$par1$/ }; return "$dev $attrName regexp error: $@" if($@); $modules{MQTT2_DEVICE}{defptr}{bridge}{$par1} = $par2; } if($init_done) { my $name = $hash->{NAME}; AnalyzeCommandChain(undef, "deleteattr $name readingList; deletereading $name .*"); } } return undef; } sub MQTT2_DEVICE_delReading($) { my ($name) = @_; my $dp = $modules{MQTT2_DEVICE}{defptr}{re}; foreach my $re (keys %{$dp}) { if($dp->{$re}{$name}) { delete($dp->{$re}{$name}); delete($dp->{$re}) if(!int(keys %{$dp->{$re}})); } } } sub MQTT2_DEVICE_addReading($$) { my ($name, $param) = @_; foreach my $line (split("\n", $param)) { my ($re,$code) = split(" ", $line,2); $modules{MQTT2_DEVICE}{defptr}{re}{$re}{$name} = $code if($re && $code); } } ##################################### sub MQTT2_DEVICE_Rename($$) { my ($new, $old) = @_; MQTT2_DEVICE_delReading($old); MQTT2_DEVICE_addReading($new, AttrVal($new, "readingList", "")); return undef; } ##################################### sub MQTT2_DEVICE_Undef($$) { my ($hash, $arg) = @_; MQTT2_DEVICE_delReading($arg); if($hash->{CID}) { my $dpc = $modules{MQTT2_DEVICE}{defptr}{cid}{$hash->{CID}}; my @nh = grep { $_->{NAME} ne $hash->{NAME} } @{$dpc}; $modules{MQTT2_DEVICE}{defptr}{cid}{$hash->{CID}} = \@nh; } return undef; } 1; =pod =item summary devices communicating via the MQTT2_SERVER or MQTT2_CLIENT =item summary_DE über den MQTT2_SERVER oder MQTT2_CLIENT kommunizierende Geräte =begin html

MQTT2_DEVICE

=end html =cut