diff --git a/fhem/FHEM/00_MQTT2_SERVER.pm b/fhem/FHEM/00_MQTT2_SERVER.pm index 3ca8b486e..f6c870aea 100644 --- a/fhem/FHEM/00_MQTT2_SERVER.pm +++ b/fhem/FHEM/00_MQTT2_SERVER.pm @@ -9,11 +9,10 @@ use warnings; use TcpServerUtils; use MIME::Base64; -sub MQTT2_SERVER_Parse($$$); sub MQTT2_SERVER_Read($@); sub MQTT2_SERVER_Write($$$); sub MQTT2_SERVER_Undef($@); -sub MQTT2_SERVER_doPublish($$$;$$); +sub MQTT2_SERVER_doPublish($$$$;$); # See also: @@ -25,6 +24,7 @@ MQTT2_SERVER_Initialize($) my ($hash) = @_; $hash->{Clients} = ":MQTT2_DEVICE:"; + $hash->{MatchList}= { "1:MQTT2_DEVICE" => "^.*" }, $hash->{ReadFn} = "MQTT2_SERVER_Read"; $hash->{DefFn} = "MQTT2_SERVER_Define"; $hash->{AttrFn} = "MQTT2_SERVER_Attr"; @@ -61,7 +61,7 @@ MQTT2_SERVER_Define($$) Log3 $hash, 1, "$ret. Exiting."; exit(1); } - $hash->{NRCLIENTS} = 0; + readingsSingleUpdate($hash, "nrclients", 0, 0); $hash->{clients} = {}; $hash->{retain} = {}; InternalTimer(1, "MQTT2_SERVER_keepaliveChecker", $hash, 0); @@ -73,12 +73,15 @@ MQTT2_SERVER_keepaliveChecker($) { my ($hash) = @_; my $now = gettimeofday(); - foreach my $clName (keys %{$hash->{clients}}) { - my $cHash = $defs{$clName}; - next if(!$cHash || !$cHash->{keepalive} || - $now < $cHash->{lastMsgTime}+$cHash->{keepalive}*1.5 ); - Log3 $hash, 3, "$hash->{NAME}: $clName left us (keepalive check)"; - CommandDelete(undef, $clName); + my $multiplier = AttrVal($hash, "keepaliveFactor", 1.5); + if($multiplier) { + foreach my $clName (keys %{$hash->{clients}}) { + my $cHash = $defs{$clName}; + next if(!$cHash || !$cHash->{keepalive} || + $now < $cHash->{lastMsgTime}+$cHash->{keepalive}*$multiplier ); + Log3 $hash, 3, "$hash->{NAME}: $clName left us (keepalive check)"; + CommandDelete(undef, $clName); + } } InternalTimer($now+10, "MQTT2_SERVER_keepaliveChecker", $hash, 0); } @@ -93,11 +96,12 @@ MQTT2_SERVER_Undef($@) my $shash = $defs{$sname}; delete($shash->{clients}{$hash->{NAME}}); - $shash->{NRCLIENTS}--; + readingsSingleUpdate($shash, "nrclients", + ReadingsVal($sname, "nrclients", 1)-1, 1); if($hash->{lwt}) { # Last will my ($tp, $val) = split(':', $hash->{lwt}, 2); - MQTT2_SERVER_doPublish($shash, $tp, $val, undef, $hash->{cflags} & 0x20); + MQTT2_SERVER_doPublish($shash, $shash, $tp, $val, $hash->{cflags} & 0x20); } return $ret; } @@ -133,7 +137,7 @@ MQTT2_SERVER_Set($@) return "Usage: publish -r topic [value]" if(@a < 1); my $tp = shift(@a); my $val = join(" ", @a); - MQTT2_SERVER_doPublish($hash, $tp, $val, undef, $retain); + MQTT2_SERVER_doPublish($hash->{CL}, $hash, $tp, $val, $retain); } } @@ -167,7 +171,8 @@ MQTT2_SERVER_Read($@) my $nhash = TcpServer_Accept($hash, "MQTT2_SERVER"); return if(!$nhash); $nhash->{CD}->blocking(0); - $hash->{NRCLIENTS}++; + readingsSingleUpdate($hash, "nrclients", + ReadingsVal($hash->{NAME}, "nrclients", 0)+1, 1); return; } @@ -280,7 +285,7 @@ MQTT2_SERVER_Read($@) $val = substr($pl, $off); Log3 $sname, 4, "$cname $hash->{cid} $cpt $tp:$val"; addToWritebuffer($hash, pack("CCnC*", 0x40, 2, $pid)) if($qos); # PUBACK - MQTT2_SERVER_doPublish($defs{$sname}, $tp, $val, $cname, $cf & 0x01); + MQTT2_SERVER_doPublish($hash, $defs{$sname}, $tp, $val, $cf & 0x01); #################################### } elsif($cpt eq "SUBSCRIBE") { @@ -328,25 +333,29 @@ MQTT2_SERVER_Read($@) ###################################### # Call sendto for all clients + Dispatch + dotrigger if rawEvents is set sub -MQTT2_SERVER_doPublish($$$;$$) +MQTT2_SERVER_doPublish($$$$;$) { - my ($hash, $tp, $val, $src, $retain) = @_; + my ($src, $tgt, $tp, $val, $retain) = @_; $val = "" if(!defined($val)); + $src = $tgt if(!defined($src)); if($retain) { my $now = gettimeofday(); my %h = ( ts=>$now, val=>$val ); - $hash->{retain}{$tp} = \%h; + $tgt->{retain}{$tp} = \%h; } - foreach my $clName (keys %{$hash->{clients}}) { - MQTT2_SERVER_sendto($defs{$clName}, $tp, $val) if(!$src || $src ne $clName); + foreach my $clName (keys %{$tgt->{clients}}) { + MQTT2_SERVER_sendto($defs{$clName}, $tp, $val) if($src->{NAME} ne $clName); } - Dispatch($hash, "$tp:$val", undef, 1); - - my $re = AttrVal($hash->{NAME}, "rawEvents", undef); - DoTrigger($hash->{NAME}, "$tp:$val") if($re && $tp =~ m/$re/); + if($src->{cid}) { # "real" MQTT client + my $cid = $src->{cid}; + $cid =~ s,[^a-z0-9._],_,gi; + Dispatch($tgt, "$cid:$tp:$val", undef, 1); + my $re = AttrVal($tgt->{NAME}, "rawEvents", undef); + DoTrigger($tgt->{NAME}, "$tp:$val") if($re && $tp =~ m/$re/); + } } ###################################### @@ -382,7 +391,7 @@ sub MQTT2_SERVER_Write($$$) { my ($hash,$topic,$msg) = @_; - MQTT2_SERVER_doPublish($hash, $topic, $msg); + MQTT2_SERVER_doPublish($hash, $hash, $topic, $msg); } sub @@ -490,6 +499,18 @@ MQTT2_SERVER_getStr($$) Should only be used, if there is no MQTT2_DEVICE to process the topic.
+ +
  • keepaliveFactor
    + the oasis spec requires a disconnect, if after 1.5 times the client + supplied keepalive no data or PINGREQ is sent. With this attribute you + can modify this factor, 0 disables the check. + Notes: + +
  • +
  • SSL
    Enable SSL (i.e. TLS) diff --git a/fhem/FHEM/10_MQTT2_DEVICE.pm b/fhem/FHEM/10_MQTT2_DEVICE.pm index 90b8ed7d1..370e60f6e 100644 --- a/fhem/FHEM/10_MQTT2_DEVICE.pm +++ b/fhem/FHEM/10_MQTT2_DEVICE.pm @@ -2,8 +2,6 @@ # $Id$ package main; -# TODO: autocreate - use strict; use warnings; use SetExtensions; @@ -46,6 +44,7 @@ MQTT2_DEVICE_Define($$) my @a = split("[ \t][ \t]*", $def); my $name = shift @a; my $type = shift @a; # always MQTT2_DEVICE + shift(@a) if(@a && $a[0] eq "autocreated"); return "wrong syntax for $name: define MQTT2_DEVICE" if(int(@a)); @@ -72,10 +71,11 @@ MQTT2_DEVICE_Parse($$) } } - my ($topic, $value) = split(":", $msg, 2); + my ($cid, $topic, $value) = split(":", $msg, 3); my $dp = $modules{MQTT2_DEVICE}{defptr}; foreach my $re (keys %{$dp}) { - next if($msg !~ m/^$re$/s); + next if(!("$topic:$value" =~ m/^$re$/s || + "$cid:$topic:$value" =~ m/^$re$/s)); foreach my $dev (keys %{$dp->{$re}}) { next if(IsDisabled($dev)); my @retData; @@ -86,13 +86,15 @@ MQTT2_DEVICE_Parse($$) if($code =~ m/^{.*}$/s) { $code = EvalSpecials($code, ("%TOPIC"=>$topic, "%EVENT"=>$value)); my $ret = AnalyzePerlCommand(undef, $code); - readingsBeginUpdate($hash); - foreach my $k (keys %{$ret}) { - readingsBulkUpdate($hash, $k, $ret->{$k}); - push(@retData, "$k $ret->{$k}"); - checkForGet($hash, $k, $ret->{$k}); + if($ret && ref $ret eq "HASH") { + readingsBeginUpdate($hash); + foreach my $k (keys %{$ret}) { + readingsBulkUpdate($hash, $k, $ret->{$k}); + push(@retData, "$k $ret->{$k}"); + checkForGet($hash, $k, $ret->{$k}); + } + readingsEndUpdate($hash, 1); } - readingsEndUpdate($hash, 1); } else { readingsSingleUpdate($hash, $code, $value, 1); @@ -104,6 +106,27 @@ MQTT2_DEVICE_Parse($$) } } + # autocreate, init readingList if message is a json string + # deactivated, as there are a lot of messages to be catched +# if(!@ret) { +# my $nn = "MQTT2_$cid"; +# if(!$defs{$nn} && $cid !~ m/mosqpub.*/) { +# PrioQueue_add(sub{ +# return if(!$defs{$nn}); +# if($value =~ m/^{.*}$/) { +# my %ret = MQTT2_JSON($msg); +# if(keys %ret) { +# CommandAttr(undef, +# "$nn readingList $cid:$topic:.* { MQTT2_JSON(\$EVENT) }"); +# } +# } +# $defs{$nn}{autocreated_on} = $msg; +# }, undef); +# return "UNDEFINED $nn MQTT2_DEVICE autocreated" +# } +# return ""; +# } + return @ret; } @@ -274,6 +297,7 @@ MQTT2_DEVICE_Attr($$) return undef; } + return "$dev attr $attrName: more parameters needed" if(!$param); #90145 foreach my $el (split("\n", $param)) { my ($par1, $par2) = split(" ", $el, 2); next if(!$par1); @@ -394,11 +418,13 @@ MQTT2_DEVICE_Undef($$)
  • readingList <regexp> [readingName|perl-Expression] ...
    - If the regexp matches topic:message either set readingName to - the published message, or evaluate the perl expression, which has to - return a hash consisting of readingName=>readingValue entries. + If the regexp matches topic:message or cid:topic:message either set + readingName to the published message, or evaluate the perl expression, + which has to return a hash consisting of readingName=>readingValue + entries. You can define multiple such tuples, separated by newline, the newline - does not have to be entered in the FHEMWEB frontend.
    + does not have to be entered in the FHEMWEB frontend. cid is the client-id + of the sending device.
    Example:
      attr dev readingList\
    diff --git a/fhem/FHEM/93_FHEM2FHEM.pm b/fhem/FHEM/93_FHEM2FHEM.pm index 34dd0565a..fc4c5a89b 100644 --- a/fhem/FHEM/93_FHEM2FHEM.pm +++ b/fhem/FHEM/93_FHEM2FHEM.pm @@ -59,9 +59,10 @@ FHEM2FHEM_Define($$) my $iodev = $defs{$rdev}; return "Undefined local device $rdev" if(!$iodev); $hash->{rawDevice} = $rdev; - $hash->{Clients} = $iodev->{Clients}; - $hash->{Clients} = $modules{$iodev->{TYPE}}{Clients} - if(!$hash->{Clients}); + + my $iomod = $modules{$iodev->{TYPE}}; + $hash->{Clients} = $iodev->{Clients} ? $iodev->{Clients} :$iomod->{Clients}; + $hash->{MatchList} = $iomod->{MatchList} if($iomod->{MatchList}); }