diff --git a/CHANGED b/CHANGED index 43ff81712..8183278e6 100644 --- a/CHANGED +++ b/CHANGED @@ -1,5 +1,6 @@ # Add changes at the top of the list. Keep it in ASCII, and 80-char wide. # Do not insert empty lines here, update check depends on it. + - feature: 00_10_MQTT2_CLIENT added (Forum #92888) - bugfix: 46_Aqicn: fix commandref, change to packages - bugfix: 88_HMCCU: fixed set toggle command - new: 98_livetracking: added module diff --git a/FHEM/00_MQTT2_CLIENT.pm b/FHEM/00_MQTT2_CLIENT.pm new file mode 100644 index 000000000..5e2d6bcfd --- /dev/null +++ b/FHEM/00_MQTT2_CLIENT.pm @@ -0,0 +1,494 @@ +############################################## +# $Id$ +package main; + +use strict; +use warnings; +use DevIo; + +sub MQTT2_CLIENT_Read($@); +sub MQTT2_CLIENT_Write($$$); +sub MQTT2_CLIENT_Undef($@); + +my $keepalive = 30; + +# See also: +# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html + +sub +MQTT2_CLIENT_Initialize($) +{ + my ($hash) = @_; + + $hash->{Clients} = ":MQTT2_DEVICE:"; + $hash->{MatchList}= { "1:MQTT2_DEVICE" => "^.*" }, + $hash->{ReadFn} = "MQTT2_CLIENT_Read"; + $hash->{DefFn} = "MQTT2_CLIENT_Define"; + $hash->{AttrFn} = "MQTT2_CLIENT_Attr"; + $hash->{SetFn} = "MQTT2_CLIENT_Set"; + $hash->{UndefFn} = "MQTT2_CLIENT_Undef"; + $hash->{WriteFn} = "MQTT2_CLIENT_Write"; + $hash->{ReadyFn} = "MQTT2_CLIENT_connect"; + + no warnings 'qw'; + my @attrList = qw( + autocreate + clientId + disable:0,1 + disabledForIntervals + lwt + lwtRetain + mqttVersion:3.1.1,3.1 + rawEvents + subscriptions + SSL + username + ); + use warnings 'qw'; + $hash->{AttrList} = join(" ", @attrList); +} + +##################################### +sub +MQTT2_CLIENT_Define($$) +{ + my ($hash, $def) = @_; + my ($name, $type, $host) = split("[ \t]+", $def); + return "Usage: define MQTT2_CLIENT :" + if(!$host); + + MQTT2_CLIENT_Undef($hash, undef) if($hash->{OLDDEF}); # modify + + $hash->{DeviceName} = $host; + $hash->{clientId} = $hash->{NAME}; + $hash->{clientId} =~ s/[^0-9a-zA-Z]//g; + $hash->{clientId} = "MQTT2_CLIENT" if(!$hash->{clientId}); + $hash->{connecting} = 1; + + InternalTimer(1, "MQTT2_CLIENT_connect", $hash, 0); # need attributes + return undef; +} + +sub +MQTT2_CLIENT_connect($) +{ + my ($hash) = @_; + return DevIo_OpenDev($hash, 0, "MQTT2_CLIENT_doinit") + if($hash->{connecting}); +} + +sub +MQTT2_CLIENT_doinit($;$) +{ + my ($hash, $subscribe) = @_; + my $name = $hash->{NAME}; + + delete($hash->{connecting}); + ############################## CONNECT + if(!$subscribe) { + my $usr = AttrVal($name, "username", ""); + my ($err, $pwd) = getKeyValue($name); + if($err) { + Log 1, "ERROR: $err"; + return; + } + my ($lwtt, $lwtm) = split(" ",AttrVal($name, "lwt", ""),2); + my $lwtr = AttrVal($name, "lwtRetain", 0); + my $m31 = (AttrVal($name, "mqttVersion", "3.1") eq "3.1"); + my $msg = + ($m31 ? pack("n",6)."MQIsdp".pack("C",3): + pack("n",4)."MQTT" .pack("C",4)). + pack("C", ($usr ? 0x80 : 0)+ + ($pwd ? 0x40 : 0)+ + ($lwtr ? 0x20 : 0)+ + ($lwtt ? 0x04 : 0)+2). # clean session + pack("n", $keepalive). + pack("n", length($hash->{clientId})).$hash->{clientId}. + ($lwtt ? (pack("n", length($lwtt)).$lwtt). + (pack("n", length($lwtm)).$lwtm) : ""). + ($usr ? (pack("n", length($usr)).$usr) : ""). + ($pwd ? (pack("n", length($pwd)).$pwd) : ""); + addToWritebuffer($hash, + pack("C",0x10). + MQTT2_CLIENT_calcRemainingLength(length($msg)).$msg); + RemoveInternalTimer($hash); + InternalTimer(gettimeofday()+$keepalive, "MQTT2_CLIENT_keepalive",$hash,0); + + ############################## SUBSCRIBE + } else { + my $msg = + pack("n", $hash->{FD}). # packed Identifier + join("", map { pack("n", length($_)).$_.pack("C",0) } # QOS:0 + split(" ", AttrVal($name, "subscriptions", "#"))); + addToWritebuffer($hash, + pack("C",0x80). + MQTT2_CLIENT_calcRemainingLength(length($msg)).$msg); + + } + return undef; +} + +sub +MQTT2_CLIENT_keepalive($) +{ + my ($hash) = @_; + my $name = $hash->{NAME}; + return if(ReadingsVal($name, "state", "") ne "opened"); + Log3 $name, 5, "$name: keepalive $keepalive"; + my $msg = join("", map { pack("n", length($_)).$_.pack("C",0) } # QOS:0 + split(" ", AttrVal($name, "subscriptions", "#"))); + addToWritebuffer($hash, + pack("C",0xC0).pack("C",0)); + InternalTimer(gettimeofday()+$keepalive, "MQTT2_CLIENT_keepalive", $hash, 0); +} + +sub +MQTT2_CLIENT_Undef($@) +{ + my ($hash, $arg) = @_; + DevIo_CloseDev($hash); + return undef; +} + +sub +MQTT2_CLIENT_Attr(@) +{ + my ($type, $devName, $attrName, @param) = @_; + my $hash = $defs{$devName}; + if($type eq "set" && $attrName eq "SSL") { + $hash->{SSL} = $param[0] ? $param[0] : 1; + } + + if($attrName eq "clientId") { + $hash->{clientId} = $param[0]; + $hash->{clientId} =~ s/[^0-9a-zA-Z]//g; + $hash->{clientId} = "MQTT2_CLIENT" if(!$hash->{clientId}); + } + + my %h = (clientId=>1,lwt=>1,lwtRetain=>1,subscriptions=>1,SSL=>1,username=>1); + if($init_done && $h{$attrName}) { + MQTT2_CLIENT_Disco($hash); + } + return undef; +} + +sub +MQTT2_CLIENT_Disco($) +{ + my ($hash) = @_; + RemoveInternalTimer($hash); + $hash->{connecting} = 1; + DevIo_Disconnected($hash); +} + +sub +MQTT2_CLIENT_Set($@) +{ + my ($hash, @a) = @_; + my %sets = ( password=>2, publish=>2 ); + my $name = $hash->{NAME}; + shift(@a); + + return "Unknown argument ?, choose one of ".join(" ", keys %sets) + if(!$a[0] || !$sets{$a[0]}); + + if($a[0] eq "publish") { + shift(@a); + my $retain; + if(@a>2 && $a[0] eq "-r") { + $retain = 1; + shift(@a); + } + return "Usage: set $name publish -r topic [value]" if(@a < 1); + my $tp = shift(@a); + my $val = join(" ", @a); + MQTT2_CLIENT_doPublish($hash, $tp, $val, $retain); + } + + if($a[0] eq "password") { + return "Usage: set $name password " if(@a < 1); + setKeyValue($name, $a[1]); + MQTT2_CLIENT_Disco($hash) if($init_done); + } +} + +my %cptype = ( + 0 => "RESERVED_0", + 1 => "CONNECT", + 2 => "CONNACK", # + 3 => "PUBLISH", # + 4 => "PUBACK", # + 5 => "PUBREC", + 6 => "PUBREL", + 7 => "PUBCOMP", + 8 => "SUBSCRIBE", + 9 => "SUBACK", # + 10 => "UNSUBSCRIBE", + 11 => "UNSUBACK", + 12 => "PINGREQ", + 13 => "PINGRESP", # + 14 => "DISCONNECT",# + 15 => "RESERVED_15", +); + +##################################### +sub +MQTT2_CLIENT_Read($@) +{ + my ($hash, $reread) = @_; + + my $name = $hash->{NAME}; + my $fd = $hash->{FD}; + + if(!$reread) { + my $buf = DevIo_SimpleRead($hash); + return "" if(!defined($buf)); + $hash->{BUF} .= $buf; + } + + my ($tlen, $off) = MQTT2_CLIENT_getRemainingLength($hash); + if($tlen < 0) { + Log3 $name, 1, "Bogus data from $name, closing connection"; + MQTT2_CLIENT_Disco($hash); + } + return if(length($hash->{BUF}) < $tlen+$off); + + my $fb = substr($hash->{BUF}, 0, 1); + my $pl = substr($hash->{BUF}, $off, $tlen); # payload + $hash->{BUF} = substr($hash->{BUF}, $tlen+$off); + + my $cp = ord(substr($fb,0,1)) >> 4; + my $cpt = $cptype{$cp}; + $hash->{lastMsgTime} = gettimeofday(); + + # Lowlevel debugging + if(AttrVal($name, "verbose", 1) >= 5) { + my $pltxt = $pl; + $pltxt =~ s/([^ -~])/"(".ord($1).")"/ge; + Log3 $name, 5, "$cpt: $pltxt"; + } + + #################################### + if($cpt eq "CONNACK") { + my $rc = ord(substr($fb,1,1)); + if($rc == 0) { + MQTT2_CLIENT_doinit($hash, 1); + + } else { + my @txt = ("Accepted", "bad proto", "bad id", "server unavailable", + "bad user name or password", "not authorized"); + Log3 1, $name, "$name: Connection refused, ". + ($rc <= int(@txt) ? $txt[$rc] : "unknown error $rc"); + MQTT2_CLIENT_Disco($hash); + return; + } + } elsif($cpt eq "PUBACK") { # ignore it + } elsif($cpt eq "SUBACK") { # ignore it + } elsif($cpt eq "PINGRESP") { # ignore it + } elsif($cpt eq "PUBLISH") { + my $cf = ord(substr($fb,0,1)) & 0xf; + my $qos = ($cf & 0x06) >> 1; + my ($tp, $val, $pid); + ($tp, $off) = MQTT2_CLIENT_getStr($pl, 0); + if($qos) { + $pid = unpack('n', substr($pl, $off, 2)); + $off += 2; + } + $val = substr($pl, $off); + addToWritebuffer($hash, pack("CCnC*", 0x40, 2, $pid)) if($qos); # PUBACK + + $val = "" if(!defined($val)); + my $ac = AttrVal($name, "autocreate", undef) ? "autocreate:":""; + my $cid = $hash->{clientId}; + Dispatch($hash, "$ac$cid:$tp:$val", undef, !$ac); + + my $re = AttrVal($name, "rawEvents", undef); + DoTrigger($name, "$tp:$val") if($re && $tp =~ m/$re/); + + } else { + Log 1, "M2: Unhandled packet $cpt, disconneting $name"; + MQTT2_CLIENT_Disco($hash); + } + return MQTT2_CLIENT_Read($hash, 1); +} + +###################################### +# send topic to client if its subscription matches the topic +sub +MQTT2_CLIENT_doPublish($$$$) +{ + my ($hash, $topic, $val, $retain) = @_; + my $name = $hash->{NAME}; + return if(IsDisabled($name)); + $val = "" if(!defined($val)); + addToWritebuffer($hash, + pack("C",0x30). + MQTT2_CLIENT_calcRemainingLength(2+length($topic)+length($val)). + pack("n", length($topic)). + $topic.$val); +} + +sub +MQTT2_CLIENT_Write($$$) +{ + my ($hash,$topic,$msg) = @_; + my $retain; + if($topic =~ m/^(.*):r$/) { + $topic = $1; + $retain = 1; + } + MQTT2_CLIENT_doPublish($hash, $topic, $msg, $retain); +} + +sub +MQTT2_CLIENT_calcRemainingLength($) +{ + my ($l) = @_; + my @r; + while($l > 0) { + my $eb = $l % 128; + $l = int($l/128); + $eb += 128 if($l); + push(@r, $eb); + } + return pack("C*", @r); +} + +sub +MQTT2_CLIENT_getRemainingLength($) +{ + my ($hash) = @_; + return (2,2) if(length($hash->{BUF}) < 2); + + my $ret = 0; + my $mul = 1; + for(my $off = 1; $off <= 4; $off++) { + my $b = ord(substr($hash->{BUF},$off,1)); + $ret += ($b & 0x7f)*$mul; + return ($ret, $off+1) if(($b & 0x80) == 0); + $mul *= 128; + } + return -1; +} + +sub +MQTT2_CLIENT_getStr($$) +{ + my ($in, $off) = @_; + my $l = unpack("n", substr($in, $off, 2)); + return (substr($in, $off+2, $l), $off+2+$l); +} + +1; + +=pod +=item helper +=item summary Connection to an external MQTT server +=item summary_DE Verbindung zu einem externen MQTT Server +=begin html + + +

MQTT2_CLIENT

+
    + MQTT2_CLIENT is a cleanroom implementation of an MQTT client (which connects + to an external server, like mosquitto) using no perl libraries. It serves as + an IODev to MQTT2_DEVICES. +

    + + + Define +
      + define <name> MQTT2_CLIENT <host>:<port> +

      + Connect to the server on <host> and <port>. <port> 1883 + is default for mosquitto. +
      + Notes:
      +
        +
      • only QOS 0 and 1 is implemented
      • +
      +
    +
    + + + Set +
      +
    • publish -r topic value
      + publish a message, -r denotes setting the retain flag. +

    • +
    • password <password> value
      + set the password, which is stored in the FHEM/FhemUtils/uniqueID file. +
    • +
    +
    + + + Get +
      N/A

    + + + Attributes +
      + + +
    • autocreate
      + if set, at least one MQTT2_DEVICE will be created, and its readingsList + will be expanded upon reception of published messages. Note: this is + slightly different from MQTT2_SERVER, where each connection has its own + clientId. This parameter is sadly not transferred via the MQTT protocol, + so the clientId of this MQTT2_CLIENT instance will be used. +

    • + + +
    • clientId <name>
      + set the MQTT clientId. If not set, the name of the MQTT2_CLIENT instance + is used, after deleting everything outside 0-9a-zA-Z +

    • + +
    • disable
      + disabledForIntervals
      + disable dispatching of messages. +

    • + + +
    • lwt <topic> <message>
      + set the LWT (last will and testament) topic and message, default is empty. +

    • + + +
    • lwtRetain
      + if set, the lwt retain flag is set +

    • + + +
    • mqttVersion 3.1,3.1.1
      + set the MQTT protocol version in the CONNECT header, default is 3.1 +

    • + + +
    • rawEvents <topic-regexp>
      + send all messages as events attributed to this MQTT2_CLIENT instance. + Should only be used, if there is no MQTT2_DEVICE to process the topic. +

    • + + +
    • subscriptions <subscriptions>
      + space separated list of MQTT subscriptions, default is # +

    • + + +
    • SSL
      + Enable SSL (i.e. TLS) +

    • + + +
    • username <username>
      + set the username. The password is set via the set command, and is stored + separately, see above. +

    • + +
    +
+=end html + +=cut diff --git a/FHEM/00_MQTT2_SERVER.pm b/FHEM/00_MQTT2_SERVER.pm index 5addb0aef..86cfa624e 100644 --- a/FHEM/00_MQTT2_SERVER.pm +++ b/FHEM/00_MQTT2_SERVER.pm @@ -253,9 +253,11 @@ MQTT2_SERVER_Read($@) $hash->{lastMsgTime} = gettimeofday(); # Lowlevel debugging - # my $pltxt = $pl; - # $pltxt =~ s/([^ -~])/"(".ord($1).")"/ge; - # Log3 $sname, 5, "$pltxt"; + if(AttrVal($sname, "verbose", 1) >= 5) { + my $pltxt = $pl; + $pltxt =~ s/([^ -~])/"(".ord($1).")"/ge; + Log3 $sname, 5, "$cpt: $pltxt"; + } if(!defined($hash->{cid}) && $cpt ne "CONNECT") { Log3 $sname, 2, "$cname $cpt before CONNECT, disconnecting"; diff --git a/FHEM/10_MQTT2_DEVICE.pm b/FHEM/10_MQTT2_DEVICE.pm index 5dc235409..57db0d67b 100644 --- a/FHEM/10_MQTT2_DEVICE.pm +++ b/FHEM/10_MQTT2_DEVICE.pm @@ -144,6 +144,7 @@ MQTT2_DEVICE_Parse($$) } last; } + return if(!$newCid); my $cidHash = $modules{MQTT2_DEVICE}{defptr}{cid}{$newCid}; my $nn = $cidHash ? $cidHash->{NAME} : "MQTT2_$newCid"; @@ -383,8 +384,8 @@ MQTT2_DEVICE_Undef($$) 1; =pod -=item summary devices communicating via the MQTT2_SERVER -=item summary_DE über den MQTT2_SERVER kommunizierende Geräte +=item summary devices communicating via the MQTT2_SERVER or MQTT2_CLIENT +=item summary_DE über den MQTT2_SERVER oder MQTT2_CLIENT kommunizierende Geräte =begin html diff --git a/FHEM/DevIo.pm b/FHEM/DevIo.pm index 7f85bc7e8..76a8bc06a 100644 --- a/FHEM/DevIo.pm +++ b/FHEM/DevIo.pm @@ -358,6 +358,7 @@ DevIo_OpenDev($$$;$) $hash->{TCPDev} = $conn; $hash->{FD} = $conn->fileno(); + $hash->{CD} = $conn; $selectlist{"$name.$dev"} = $hash; return 1; }; diff --git a/MAINTAINER.txt b/MAINTAINER.txt index 1c19d30c1..071c1f4dc 100644 --- a/MAINTAINER.txt +++ b/MAINTAINER.txt @@ -29,6 +29,7 @@ FHEM/00_LIRC.pm rudolfkoenig Sonstiges FHEM/00_MAXLAN.pm rudolfkoenig/orphan MAX FHEM/00_MQTT.pm hexenmeister MQTT FHEM/00_MQTT2_SERVER.pm rudolfkoenig MQTT +FHEM/00_MQTT2_CLIENT.pm rudolfkoenig MQTT FHEM/00_MYSENSORS.pm Hauswart Sonstige Systeme FHEM/00_NetzerI2C.pm klausw Sonstige Systeme FHEM/00_Neuron.pm klausw Sonstige Systeme diff --git a/fhem.pl b/fhem.pl index f9ea2e727..5822a9cf7 100755 --- a/fhem.pl +++ b/fhem.pl @@ -303,7 +303,7 @@ my @globalAttrList = qw( archivedir archivesort:timestamp,alphanum archiveCompress - autoload_undefined_devices:1,0 + autoload_undefined_devices:0,1 autosave:1,0 backup_before_update backupcmd @@ -3806,7 +3806,7 @@ Dispatch($$;$$) if($dmsg =~ m/$h->{$m}/) { my ($order, $mname) = split(":", $m); - if($attr{global}{autoload_undefined_devices}) { + if(AttrVal("global", "autoload_undefined_devices", 1)) { my $newm = LoadModule($mname); $mname = $newm if($newm ne "UNDEFINED"); if($modules{$mname} && $modules{$mname}{ParseFn}) {