mirror of
https://github.com/fhem/fhem-mirror.git
synced 2025-05-04 22:19:38 +00:00
00_MQTT2_CLIENT.pm: "MQTT_GENERiC_BRIDGE" changes (Forum #93255)
git-svn-id: https://svn.fhem.de/fhem/trunk/fhem@17757 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
parent
17e5be0c15
commit
44ef21b0db
@ -9,6 +9,7 @@ use DevIo;
|
|||||||
sub MQTT2_CLIENT_Read($@);
|
sub MQTT2_CLIENT_Read($@);
|
||||||
sub MQTT2_CLIENT_Write($$$);
|
sub MQTT2_CLIENT_Write($$$);
|
||||||
sub MQTT2_CLIENT_Undef($@);
|
sub MQTT2_CLIENT_Undef($@);
|
||||||
|
sub MQTT2_CLIENT_doPublish($@);
|
||||||
|
|
||||||
my $keepalive = 30;
|
my $keepalive = 30;
|
||||||
|
|
||||||
@ -30,7 +31,8 @@ MQTT2_CLIENT_Initialize($)
|
|||||||
$hash->{AttrFn} = "MQTT2_CLIENT_Attr";
|
$hash->{AttrFn} = "MQTT2_CLIENT_Attr";
|
||||||
$hash->{SetFn} = "MQTT2_CLIENT_Set";
|
$hash->{SetFn} = "MQTT2_CLIENT_Set";
|
||||||
$hash->{UndefFn} = "MQTT2_CLIENT_Undef";
|
$hash->{UndefFn} = "MQTT2_CLIENT_Undef";
|
||||||
$hash->{DeleteFn}= "MQTT2_CLIENT_Delete";
|
$hash->{ShutdownFn} = "MQTT2_CLIENT_Undef";
|
||||||
|
$hash->{DeleteFn} = "MQTT2_CLIENT_Delete";
|
||||||
$hash->{WriteFn} = "MQTT2_CLIENT_Write";
|
$hash->{WriteFn} = "MQTT2_CLIENT_Write";
|
||||||
$hash->{ReadyFn} = "MQTT2_CLIENT_connect";
|
$hash->{ReadyFn} = "MQTT2_CLIENT_connect";
|
||||||
|
|
||||||
@ -42,8 +44,9 @@ MQTT2_CLIENT_Initialize($)
|
|||||||
disabledForIntervals
|
disabledForIntervals
|
||||||
lwt
|
lwt
|
||||||
lwtRetain
|
lwtRetain
|
||||||
|
msgAfterConnect
|
||||||
|
msgBeforeDisconnect
|
||||||
mqttVersion:3.1.1,3.1
|
mqttVersion:3.1.1,3.1
|
||||||
onConnect
|
|
||||||
rawEvents
|
rawEvents
|
||||||
subscriptions
|
subscriptions
|
||||||
SSL
|
SSL
|
||||||
@ -124,10 +127,14 @@ MQTT2_CLIENT_doinit($)
|
|||||||
|
|
||||||
############################## SUBSCRIBE
|
############################## SUBSCRIBE
|
||||||
} elsif($hash->{connecting} == 2) {
|
} elsif($hash->{connecting} == 2) {
|
||||||
|
my $s = AttrVal($name, "subscriptions", "#");
|
||||||
|
if($s eq "setByTheProgram") {
|
||||||
|
$s = ($hash->{".subscribe"} ? $hash->{".subscribe"} : "#");
|
||||||
|
}
|
||||||
my $msg =
|
my $msg =
|
||||||
pack("n", $hash->{FD}). # packed Identifier
|
pack("n", $hash->{FD}). # packed Identifier
|
||||||
join("", map { pack("n", length($_)).$_.pack("C",0) } # QOS:0
|
join("", map { pack("n", length($_)).$_.pack("C",0) } # QOS:0
|
||||||
split(" ", AttrVal($name, "subscriptions", "#")));
|
split(" ", $s));
|
||||||
addToWritebuffer($hash,
|
addToWritebuffer($hash,
|
||||||
pack("C",0x80).
|
pack("C",0x80).
|
||||||
MQTT2_CLIENT_calcRemainingLength(length($msg)).$msg);
|
MQTT2_CLIENT_calcRemainingLength(length($msg)).$msg);
|
||||||
@ -144,10 +151,7 @@ MQTT2_CLIENT_keepalive($)
|
|||||||
my $name = $hash->{NAME};
|
my $name = $hash->{NAME};
|
||||||
return if(ReadingsVal($name, "state", "") ne "opened");
|
return if(ReadingsVal($name, "state", "") ne "opened");
|
||||||
Log3 $name, 5, "$name: keepalive $keepalive";
|
Log3 $name, 5, "$name: keepalive $keepalive";
|
||||||
my $msg = join("", map { pack("n", length($_)).$_.pack("C",0) } # QOS:0
|
addToWritebuffer($hash, pack("C",0xC0).pack("C",0)); # PINGREQ
|
||||||
split(" ", AttrVal($name, "subscriptions", "#")));
|
|
||||||
addToWritebuffer($hash,
|
|
||||||
pack("C",0xC0).pack("C",0));
|
|
||||||
InternalTimer(gettimeofday()+$keepalive, "MQTT2_CLIENT_keepalive", $hash, 0);
|
InternalTimer(gettimeofday()+$keepalive, "MQTT2_CLIENT_keepalive", $hash, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -155,10 +159,27 @@ sub
|
|||||||
MQTT2_CLIENT_Undef($@)
|
MQTT2_CLIENT_Undef($@)
|
||||||
{
|
{
|
||||||
my ($hash, $arg) = @_;
|
my ($hash, $arg) = @_;
|
||||||
|
RemoveInternalTimer($hash);
|
||||||
|
my $ond = AttrVal($hash->{NAME}, "msgBeforeDisconnect", "");
|
||||||
|
MQTT2_CLIENT_doPublish($hash, split(" ", $ond, 2), 0, 1) if($ond);
|
||||||
|
DevIo_SimpleWrite($hash, pack("C",0xE0).pack("C",0), 0); # DISCONNECT
|
||||||
DevIo_CloseDev($hash);
|
DevIo_CloseDev($hash);
|
||||||
return undef;
|
return undef;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub
|
||||||
|
MQTT2_CLIENT_Disco($)
|
||||||
|
{
|
||||||
|
my ($hash) = @_;
|
||||||
|
RemoveInternalTimer($hash);
|
||||||
|
$hash->{connecting} = 1;
|
||||||
|
my $ond = AttrVal($hash->{NAME}, "msgBeforeDisconnect", "");
|
||||||
|
MQTT2_CLIENT_doPublish($hash, split(" ", $ond, 2), 0, 0) if($ond);
|
||||||
|
addToWritebuffer($hash, pack("C",0xE0).pack("C",0)); # DISCONNECT
|
||||||
|
DevIo_Disconnected($hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
sub
|
sub
|
||||||
MQTT2_CLIENT_Delete($@)
|
MQTT2_CLIENT_Delete($@)
|
||||||
{
|
{
|
||||||
@ -190,15 +211,6 @@ MQTT2_CLIENT_Attr(@)
|
|||||||
return undef;
|
return undef;
|
||||||
}
|
}
|
||||||
|
|
||||||
sub
|
|
||||||
MQTT2_CLIENT_Disco($)
|
|
||||||
{
|
|
||||||
my ($hash) = @_;
|
|
||||||
RemoveInternalTimer($hash);
|
|
||||||
$hash->{connecting} = 1;
|
|
||||||
DevIo_Disconnected($hash);
|
|
||||||
}
|
|
||||||
|
|
||||||
sub
|
sub
|
||||||
MQTT2_CLIENT_Set($@)
|
MQTT2_CLIENT_Set($@)
|
||||||
{
|
{
|
||||||
@ -291,7 +303,7 @@ MQTT2_CLIENT_Read($@)
|
|||||||
if($cpt eq "CONNACK") {
|
if($cpt eq "CONNACK") {
|
||||||
my $rc = ord(substr($pl,1,1));
|
my $rc = ord(substr($pl,1,1));
|
||||||
if($rc == 0) {
|
if($rc == 0) {
|
||||||
my $onc = AttrVal($name, "onConnect", "");
|
my $onc = AttrVal($name, "msgAfterConnect", "");
|
||||||
MQTT2_CLIENT_doPublish($hash, split(" ", $onc, 2)) if($onc);
|
MQTT2_CLIENT_doPublish($hash, split(" ", $onc, 2)) if($onc);
|
||||||
MQTT2_CLIENT_doinit($hash);
|
MQTT2_CLIENT_doinit($hash);
|
||||||
|
|
||||||
@ -340,30 +352,47 @@ MQTT2_CLIENT_Read($@)
|
|||||||
######################################
|
######################################
|
||||||
# send topic to client if its subscription matches the topic
|
# send topic to client if its subscription matches the topic
|
||||||
sub
|
sub
|
||||||
MQTT2_CLIENT_doPublish($$$$)
|
MQTT2_CLIENT_doPublish($@)
|
||||||
{
|
{
|
||||||
my ($hash, $topic, $val, $retain) = @_;
|
my ($hash, $topic, $val, $retain, $immediate) = @_;
|
||||||
my $name = $hash->{NAME};
|
my $name = $hash->{NAME};
|
||||||
return if(IsDisabled($name));
|
return if(IsDisabled($name));
|
||||||
$val = "" if(!defined($val));
|
$val = "" if(!defined($val));
|
||||||
Log3 $name, 5, "$name: sending PUBLISH $topic $val";
|
Log3 $name, 5, "$name: sending PUBLISH $topic $val";
|
||||||
addToWritebuffer($hash,
|
|
||||||
pack("C",0x30).
|
my $msg = pack("C",0x30).
|
||||||
MQTT2_CLIENT_calcRemainingLength(2+length($topic)+length($val)).
|
MQTT2_CLIENT_calcRemainingLength(2+length($topic)+length($val)).
|
||||||
pack("n", length($topic)).
|
pack("n", length($topic)).
|
||||||
$topic.$val);
|
$topic.$val;
|
||||||
|
if($immediate) {
|
||||||
|
DevIo_SimpleWrite($hash, $msg, 0);
|
||||||
|
} else {
|
||||||
|
addToWritebuffer($hash, $msg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sub
|
sub
|
||||||
MQTT2_CLIENT_Write($$$)
|
MQTT2_CLIENT_Write($$$)
|
||||||
{
|
{
|
||||||
my ($hash,$topic,$msg) = @_;
|
my ($hash, $function, $topicMsg) = @_;
|
||||||
|
|
||||||
|
if($function eq "publish") {
|
||||||
|
my ($topic, $msg) = split(" ", $topicMsg, 2);
|
||||||
my $retain;
|
my $retain;
|
||||||
if($topic =~ m/^(.*):r$/) {
|
if($topic =~ m/^(.*):r$/) {
|
||||||
$topic = $1;
|
$topic = $1;
|
||||||
$retain = 1;
|
$retain = 1;
|
||||||
}
|
}
|
||||||
MQTT2_CLIENT_doPublish($hash, $topic, $msg, $retain);
|
MQTT2_CLIENT_doPublish($hash, $topic, $msg, $retain);
|
||||||
|
|
||||||
|
} elsif($function eq "subscribe") {
|
||||||
|
$hash->{".subscribtion"} = $topicMsg;
|
||||||
|
MQTT2_CLIENT_Disco($hash);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
my $name = $hash->{NAME};
|
||||||
|
Log3 $name, 1, "$name: ERROR: Ignoring function $function";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sub
|
sub
|
||||||
@ -491,11 +520,16 @@ MQTT2_CLIENT_getStr($$)
|
|||||||
set the MQTT protocol version in the CONNECT header, default is 3.1
|
set the MQTT protocol version in the CONNECT header, default is 3.1
|
||||||
</li></br>
|
</li></br>
|
||||||
|
|
||||||
<a name="onConnect"></a>
|
<a name="msgAfterConnect"></a>
|
||||||
<li>onConnect topic message<br>
|
<li>msgAfterConnect topic message<br>
|
||||||
publish the topic after each connect or reconnect.
|
publish the topic after each connect or reconnect.
|
||||||
</li></br>
|
</li></br>
|
||||||
|
|
||||||
|
<a name="msgBeforeDisconnect"></a>
|
||||||
|
<li>msgBeforeDisconnect topic message<br>
|
||||||
|
publish the topic bofore each disconnect.
|
||||||
|
</li></br>
|
||||||
|
|
||||||
<a name="rawEvents"></a>
|
<a name="rawEvents"></a>
|
||||||
<li>rawEvents <topic-regexp><br>
|
<li>rawEvents <topic-regexp><br>
|
||||||
send all messages as events attributed to this MQTT2_CLIENT instance.
|
send all messages as events attributed to this MQTT2_CLIENT instance.
|
||||||
@ -504,7 +538,9 @@ MQTT2_CLIENT_getStr($$)
|
|||||||
|
|
||||||
<a name="subscriptions"></a>
|
<a name="subscriptions"></a>
|
||||||
<li>subscriptions <subscriptions><br>
|
<li>subscriptions <subscriptions><br>
|
||||||
space separated list of MQTT subscriptions, default is #
|
space separated list of MQTT subscriptions, default is #<br>
|
||||||
|
Note: if the value is the literal setByTheProgram, then the value sent by
|
||||||
|
the client (e.g. MQTT_GENERIC_BRIDGE) is used.
|
||||||
</li><br>
|
</li><br>
|
||||||
|
|
||||||
<a name="SSL"></a>
|
<a name="SSL"></a>
|
||||||
|
@ -378,6 +378,7 @@ MQTT2_SERVER_Read($@)
|
|||||||
####################################
|
####################################
|
||||||
} elsif($cpt eq "DISCONNECT") {
|
} elsif($cpt eq "DISCONNECT") {
|
||||||
Log3 $sname, 4, "$cname $hash->{cid} $cpt";
|
Log3 $sname, 4, "$cname $hash->{cid} $cpt";
|
||||||
|
delete($hash->{lwt}); # no LWT on disconnect, see doc, chapter 3.14
|
||||||
CommandDelete(undef, $cname);
|
CommandDelete(undef, $cname);
|
||||||
|
|
||||||
####################################
|
####################################
|
||||||
@ -457,13 +458,21 @@ MQTT2_SERVER_terminate($$)
|
|||||||
sub
|
sub
|
||||||
MQTT2_SERVER_Write($$$)
|
MQTT2_SERVER_Write($$$)
|
||||||
{
|
{
|
||||||
my ($hash,$topic,$msg) = @_;
|
my ($hash,$function,$topicMsg) = @_;
|
||||||
|
|
||||||
|
if($function eq "publish") {
|
||||||
|
my ($topic, $msg) = split(" ", $topicMsg, 2);
|
||||||
my $retain;
|
my $retain;
|
||||||
if($topic =~ m/^(.*):r$/) {
|
if($topic =~ m/^(.*):r$/) {
|
||||||
$topic = $1;
|
$topic = $1;
|
||||||
$retain = 1;
|
$retain = 1;
|
||||||
}
|
}
|
||||||
MQTT2_SERVER_doPublish($hash, $hash, $topic, $msg, $retain);
|
MQTT2_SERVER_doPublish($hash, $hash, $topic, $msg, $retain);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
my $name = $hash->{NAME};
|
||||||
|
Log3 $name, 1, "$name: ERROR: Ignoring function $function";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sub
|
sub
|
||||||
|
@ -251,7 +251,7 @@ MQTT2_DEVICE_Get($@)
|
|||||||
|
|
||||||
$cmd = MQTT2_buildCmd($hash, \@a, $cmd);
|
$cmd = MQTT2_buildCmd($hash, \@a, $cmd);
|
||||||
return if(!$cmd);
|
return if(!$cmd);
|
||||||
IOWrite($hash, split(" ",$cmd,2));
|
IOWrite($hash, "publish", $cmd);
|
||||||
|
|
||||||
return undef;
|
return undef;
|
||||||
}
|
}
|
||||||
@ -271,7 +271,7 @@ MQTT2_DEVICE_Set($@)
|
|||||||
|
|
||||||
$cmd = MQTT2_buildCmd($hash, \@a, $cmd);
|
$cmd = MQTT2_buildCmd($hash, \@a, $cmd);
|
||||||
return if(!$cmd);
|
return if(!$cmd);
|
||||||
IOWrite($hash, split(" ",$cmd,2));
|
IOWrite($hash, "publish", $cmd);
|
||||||
readingsSingleUpdate($hash, "state", $cmdName, 1);
|
readingsSingleUpdate($hash, "state", $cmdName, 1);
|
||||||
return undef;
|
return undef;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user