MQTT: refactor to get rid of 'constant subroutine redifined'-messages caused by import of Net::MQTT

git-svn-id: https://svn.fhem.de/fhem/trunk@6653 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
ntruchsess 2014-10-02 11:59:37 +00:00
parent 99b96cce4f
commit f01a844bf5
4 changed files with 216 additions and 154 deletions

View File

@ -23,15 +23,6 @@
# #
############################################## ##############################################
use strict;
use warnings;
use GPUtils qw(:all);
use Net::MQTT::Constants;
use Net::MQTT::Message;
require "10_MQTT_DEVICE.pm";
my %sets = ( my %sets = (
"connect" => "", "connect" => "",
"disconnect" => "", "disconnect" => "",
@ -53,36 +44,67 @@ sub MQTT_Initialize($) {
# Provider # Provider
$hash->{Clients} = join (':',@clients); $hash->{Clients} = join (':',@clients);
$hash->{ReadyFn} = "MQTT_Ready"; $hash->{ReadyFn} = "MQTT::Ready";
$hash->{ReadFn} = "MQTT_Read"; $hash->{ReadFn} = "MQTT::Read";
# Consumer # Consumer
$hash->{DefFn} = "MQTT_Define"; $hash->{DefFn} = "MQTT::Define";
$hash->{UndefFn} = "MQTT_Undef"; $hash->{UndefFn} = "MQTT::Undef";
$hash->{SetFn} = "MQTT_Set"; $hash->{SetFn} = "MQTT::Set";
$hash->{NotifyFn} = "MQTT_Notify"; $hash->{NotifyFn} = "MQTT::Notify";
$hash->{AttrList} = "keep-alive"; $hash->{AttrList} = "keep-alive";
} }
sub MQTT_Define($$) { package MQTT;
use Exporter ('import');
@EXPORT = ();
@EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr);
%EXPORT_TAGS = (all => [@EXPORT_OK]);
use strict;
use warnings;
use GPUtils qw(:all);
use Net::MQTT::Constants;
use Net::MQTT::Message;
our %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE);
BEGIN {GP_Import(qw(
gettimeofday
readingsSingleUpdate
DevIo_OpenDev
DevIo_SimpleWrite
DevIo_SimpleRead
DevIo_CloseDev
RemoveInternalTimer
InternalTimer
AttrVal
Log3
AssignIoPort
))};
sub Define($$) {
my ( $hash, $def ) = @_; my ( $hash, $def ) = @_;
$hash->{NOTIFYDEV} = "global"; $hash->{NOTIFYDEV} = "global";
$hash->{msgid} = 1; $hash->{msgid} = 1;
if ($main::init_done) { if ($main::init_done) {
return MQTT_Start($hash); return Start($hash);
} else { } else {
return undef; return undef;
} }
} }
sub MQTT_Undef($) { sub Undef($) {
MQTT_Stop(shift); Stop(shift);
} }
sub MQTT_Set($@) { sub Set($@) {
my ($hash, @a) = @_; my ($hash, @a) = @_;
return "Need at least one parameters" if(@a < 2); return "Need at least one parameters" if(@a < 2);
return "Unknown argument $a[1], choose one of " . join(" ", sort keys %sets) return "Unknown argument $a[1], choose one of " . join(" ", sort keys %sets)
@ -92,64 +114,64 @@ sub MQTT_Set($@) {
COMMAND_HANDLER: { COMMAND_HANDLER: {
$command eq "connect" and do { $command eq "connect" and do {
MQTT_Start($hash); Start($hash);
last; last;
}; };
$command eq "disconnect" and do { $command eq "disconnect" and do {
MQTT_Stop($hash); Stop($hash);
last; last;
}; };
}; };
} }
sub MQTT_Notify($$) { sub Notify($$) {
my ($hash,$dev) = @_; my ($hash,$dev) = @_;
if( grep(m/^(INITIALIZED|REREADCFG)$/, @{$dev->{CHANGED}}) ) { if( grep(m/^(INITIALIZED|REREADCFG)$/, @{$dev->{CHANGED}}) ) {
MQTT_Start($hash); Start($hash);
} elsif( grep(m/^SAVE$/, @{$dev->{CHANGED}}) ) { } elsif( grep(m/^SAVE$/, @{$dev->{CHANGED}}) ) {
} }
} }
sub MQTT_Start($) { sub Start($) {
my $hash = shift; my $hash = shift;
my ($dev) = split("[ \t]+", $hash->{DEF}); my ($dev) = split("[ \t]+", $hash->{DEF});
$hash->{DeviceName} = $dev; $hash->{DeviceName} = $dev;
DevIo_CloseDev($hash); DevIo_CloseDev($hash);
return DevIo_OpenDev($hash, 0, "MQTT_Init"); return DevIo_OpenDev($hash, 0, "MQTT::Init");
} }
sub MQTT_Stop($) { sub Stop($) {
my $hash = shift; my $hash = shift;
MQTT_send_disconnect($hash); send_disconnect($hash);
DevIo_CloseDev($hash); DevIo_CloseDev($hash);
main::RemoveInternalTimer($hash); RemoveInternalTimer($hash);
main::readingsSingleUpdate($hash,"connection","disconnected",1); readingsSingleUpdate($hash,"connection","disconnected",1);
} }
sub MQTT_Ready($) { sub Ready($) {
my $hash = shift; my $hash = shift;
return DevIo_OpenDev($hash, 1, "MQTT_Init") if($hash->{STATE} eq "disconnected"); return DevIo_OpenDev($hash, 1, "MQTT::Init") if($hash->{STATE} eq "disconnected");
} }
sub MQTT_Init($) { sub Init($) {
my $hash = shift; my $hash = shift;
MQTT_send_connect($hash); send_connect($hash);
main::readingsSingleUpdate($hash,"connection","connecting",1); readingsSingleUpdate($hash,"connection","connecting",1);
$hash->{ping_received}=1; $hash->{ping_received}=1;
MQTT_Timer($hash); Timer($hash);
return undef; return undef;
} }
sub MQTT_Timer($) { sub Timer($) {
my $hash = shift; my $hash = shift;
main::RemoveInternalTimer($hash); RemoveInternalTimer($hash);
main::readingsSingleUpdate($hash,"connection","timed-out",1) unless $hash->{ping_received}; readingsSingleUpdate($hash,"connection","timed-out",1) unless $hash->{ping_received};
$hash->{ping_received} = 0; $hash->{ping_received} = 0;
main::InternalTimer(gettimeofday()+main::AttrVal($hash-> {NAME},"keep-alive",60), "MQTT_Timer", $hash, 0); InternalTimer(gettimeofday()+AttrVal($hash-> {NAME},"keep-alive",60), "MQTT::Timer", $hash, 0);
MQTT_send_ping($hash); send_ping($hash);
} }
sub MQTT_Read { sub Read {
my ($hash) = @_; my ($hash) = @_;
my $name = $hash->{NAME}; my $name = $hash->{NAME};
my $buf = DevIo_SimpleRead($hash); my $buf = DevIo_SimpleRead($hash);
@ -158,12 +180,12 @@ sub MQTT_Read {
while (my $mqtt = Net::MQTT::Message->new_from_bytes($hash->{buf},1)) { while (my $mqtt = Net::MQTT::Message->new_from_bytes($hash->{buf},1)) {
my $message_type = $mqtt->message_type(); my $message_type = $mqtt->message_type();
main::Log3($name,5,"MQTT $name message received: ".$mqtt->string()); Log3($name,5,"MQTT $name message received: ".$mqtt->string());
MESSAGE_TYPE: { MESSAGE_TYPE: {
$message_type == MQTT_CONNACK and do { $message_type == MQTT_CONNACK and do {
readingsSingleUpdate($hash,"connection","connected",1); readingsSingleUpdate($hash,"connection","connected",1);
GP_ForallClients($hash,\&MQTT_client_start); GP_ForallClients($hash,\&client_start);
last; last;
}; };
@ -171,13 +193,13 @@ sub MQTT_Read {
my $topic = $mqtt->topic(); my $topic = $mqtt->topic();
GP_ForallClients($hash,sub { GP_ForallClients($hash,sub {
my $client = shift; my $client = shift;
main::Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message()); Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message());
if (grep { $_ eq $topic } @{$client->{subscribe}}) { if (grep { $_ eq $topic } @{$client->{subscribe}}) {
readingsSingleUpdate($client,"transmission-state","publish received",1); readingsSingleUpdate($client,"transmission-state","publish received",1);
if ($client->{TYPE} eq "MQTT_DEVICE") { if ($client->{TYPE} eq "MQTT_DEVICE") {
MQTT_DEVICE_onmessage($client,$topic,$mqtt->message()); MQTT::DEVICE::onmessage($client,$topic,$mqtt->message());
} else { } else {
MQTT_BRIDGE_onmessage($client,$topic,$mqtt->message()); MQTT::BRIDGE::onmessage($client,$topic,$mqtt->message());
} }
}; };
},undef); },undef);
@ -258,95 +280,123 @@ sub MQTT_Read {
$message_type == MQTT_PINGRESP and do { $message_type == MQTT_PINGRESP and do {
$hash->{ping_received} = 1; $hash->{ping_received} = 1;
main::readingsSingleUpdate($hash,"connection","active",1); readingsSingleUpdate($hash,"connection","active",1);
last; last;
}; };
main::Log3($hash->{NAME},4,"MQTT_Read '$hash->{NAME}' unexpected message type '".message_type_string($message_type)."'"); Log3($hash->{NAME},4,"MQTT::Read '$hash->{NAME}' unexpected message type '".message_type_string($message_type)."'");
} }
} }
return undef; return undef;
}; };
sub MQTT_send_connect($) { sub send_connect($) {
my $hash = shift; my $hash = shift;
return MQTT_send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => main::AttrVal($hash->{NAME},"keep-alive",60)); return send_message($hash, message_type => MQTT_CONNECT, keep_alive_timer => AttrVal($hash->{NAME},"keep-alive",60));
}; };
sub MQTT_send_publish($@) { sub send_publish($@) {
return MQTT_send_message(shift, message_type => MQTT_PUBLISH, @_); return send_message(shift, message_type => MQTT_PUBLISH, @_);
}; };
sub MQTT_send_subscribe($@) { sub send_subscribe($@) {
my $hash = shift; my $hash = shift;
return MQTT_send_message($hash, message_type => MQTT_SUBSCRIBE, @_); return send_message($hash, message_type => MQTT_SUBSCRIBE, @_);
}; };
sub MQTT_send_unsubscribe($@) { sub send_unsubscribe($@) {
return MQTT_send_message(shift, message_type => MQTT_UNSUBSCRIBE, @_); return send_message(shift, message_type => MQTT_UNSUBSCRIBE, @_);
}; };
sub MQTT_send_ping($) { sub send_ping($) {
return MQTT_send_message(shift, message_type => MQTT_PINGREQ); return send_message(shift, message_type => MQTT_PINGREQ);
}; };
sub MQTT_send_disconnect($) { sub send_disconnect($) {
return MQTT_send_message(shift, message_type => MQTT_DISCONNECT); return send_message(shift, message_type => MQTT_DISCONNECT);
}; };
sub MQTT_send_message($$$@) { sub send_message($$$@) {
my $hash = shift; my $hash = shift;
my $name = $hash->{NAME}; my $name = $hash->{NAME};
my $msgid = $hash->{msgid}++; my $msgid = $hash->{msgid}++;
my $msg = Net::MQTT::Message->new(message_id => $msgid,@_); my $msg = Net::MQTT::Message->new(message_id => $msgid,@_);
main::Log3($name,5,"MQTT $name message sent: ".$msg->string()); Log3($name,5,"MQTT $name message sent: ".$msg->string());
DevIo_SimpleWrite($hash,$msg->bytes,undef); DevIo_SimpleWrite($hash,$msg->bytes,undef);
return $msgid; return $msgid;
}; };
sub MQTT_client_define($$) { sub Client_Define($$) {
my ( $client, $def ) = @_; my ( $client, $def ) = @_;
$client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF}; $client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF};
$client->{qos} = MQTT_QOS_AT_MOST_ONCE; $client->{qos} = MQTT_QOS_AT_MOST_ONCE;
$client->{subscribe} = []; $client->{subscribe} = [];
if ($main::init_done) { if ($main::init_done) {
return MQTT_client_start($client); return client_start($client);
} else { } else {
return undef; return undef;
} }
};
sub Client_Undefine($) {
client_stop(shift);
};
sub client_attr($$$$$) {
my ($client,$command,$name,$attribute,$value) = @_;
ATTRIBUTE_HANDLER: {
$attribute eq "qos" and do {
if ($command eq "set") {
$client->{qos} = $MQTT::qos{$value};
} else {
$client->{qos} = MQTT_QOS_AT_MOST_ONCE;
} }
last;
sub MQTT_client_undefine($) { };
MQTT_client_stop(shift); $attribute eq "IODev" and do {
if ($main::init_done) {
if ($command eq "set") {
client_stop($client);
$main::attr{$name}{IODev} = $value;
client_start($client);
} else {
client_stop($client);
} }
}
last;
};
}
};
sub client_start($) {
sub MQTT_client_start($) {
my $client = shift; my $client = shift;
AssignIoPort($client);
my $name = $client->{NAME}; my $name = $client->{NAME};
if (! (defined AttrVal($name,"stateFormat",undef))) { if (! (defined AttrVal($name,"stateFormat",undef))) {
$main::attr{$name}{stateFormat} = "transmission-state"; $main::attr{$name}{stateFormat} = "transmission-state";
} }
if (@{$client->{subscribe}}) { if (@{$client->{subscribe}}) {
my $msgid = MQTT_send_subscribe($client->{IODev}, my $msgid = send_subscribe($client->{IODev},
topics => [map { [$_ => $client->{qos} || MQTT_QOS_AT_MOST_ONCE] } @{$client->{subscribe}}], topics => [map { [$_ => $client->{qos} || MQTT_QOS_AT_MOST_ONCE] } @{$client->{subscribe}}],
); );
$client->{message_ids}->{$msgid}++; $client->{message_ids}->{$msgid}++;
readingsSingleUpdate($client,"transmission-state","subscribe sent",1) readingsSingleUpdate($client,"transmission-state","subscribe sent",1);
}
} }
};
sub MQTT_client_stop($) { sub client_stop($) {
my $client = shift; my $client = shift;
if (@{$client->{subscribe}}) { if (@{$client->{subscribe}}) {
my $msgid = MQTT_send_unsubscribe($client->{IODev}, my $msgid = send_unsubscribe($client->{IODev},
topics => [@{$client->{subscribe}}], topics => [@{$client->{subscribe}}],
); );
$client->{message_ids}->{$msgid}++; $client->{message_ids}->{$msgid}++;
readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1) readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1);
}
} }
};
1; 1;

View File

@ -25,10 +25,6 @@
use strict; use strict;
use warnings; use warnings;
use GPUtils qw(:all);
use Net::MQTT::Constants;
use Net::MQTT::Message;
my %sets = ( my %sets = (
); );
@ -38,31 +34,51 @@ my %gets = (
"readings" => "" "readings" => ""
); );
my %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE);
sub MQTT_BRIDGE_Initialize($) { sub MQTT_BRIDGE_Initialize($) {
my $hash = shift @_; my $hash = shift @_;
# Consumer # Consumer
$hash->{DefFn} = "MQTT_client_define"; $hash->{DefFn} = "MQTT::Client_Define";
$hash->{UndefFn} = "MQTT_client_undefine"; $hash->{UndefFn} = "MQTT::Client_Undefine";
$hash->{GetFn} = "MQTT_BRIDGE_Get"; $hash->{GetFn} = "MQTT::BRIDGE::Get";
$hash->{NotifyFn} = "MQTT_BRIDGE_Notify"; $hash->{NotifyFn} = "MQTT::BRIDGE::Notify";
$hash->{AttrFn} = "MQTT_BRIDGE_Attr"; $hash->{AttrFn} = "MQTT::BRIDGE::Attr";
$hash->{AttrList} = $hash->{AttrList} =
"IODev ". "IODev ".
"qos:".join(",",keys %qos)." ". "qos:".join(",",keys %MQTT::qos)." ".
"publish-topic-base ". "publish-topic-base ".
"publishState ". "publishState ".
"publishReading_.* ". "publishReading_.* ".
"subscribeSet ". "subscribeSet ".
"subscribeSet_.* ". "subscribeSet_.* ".
$main::readingFnAttributes; $main::readingFnAttributes;
main::LoadModule("MQTT");
} }
sub MQTT_BRIDGE_Get($$@) { package MQTT::BRIDGE;
use strict;
use warnings;
use GPUtils qw(:all);
use Net::MQTT::Constants;
BEGIN {
MQTT->import(qw(:all));
GP_Import(qw(
AttrVal
CommandAttr
readingsSingleUpdate
Log3
DoSet
))
};
sub Get($$@) {
my ($hash, $name, $command) = @_; my ($hash, $name, $command) = @_;
return "Need at least one parameters" unless (defined $command); return "Need at least one parameters" unless (defined $command);
return "Unknown argument $command, choose one of " . join(" ", sort keys %gets) return "Unknown argument $command, choose one of " . join(" ", sort keys %gets)
@ -73,8 +89,8 @@ sub MQTT_BRIDGE_Get($$@) {
$command eq "readings" and do { $command eq "readings" and do {
my $base = AttrVal($name,"publish-topic-base","/$hash->{DEF}/"); my $base = AttrVal($name,"publish-topic-base","/$hash->{DEF}/");
foreach my $reading (keys %{$main::defs{$hash->{DEF}}{READINGS}}) { foreach my $reading (keys %{$main::defs{$hash->{DEF}}{READINGS}}) {
unless (defined main::AttrVal($name,"publishReading_$reading",undef)) { unless (defined AttrVal($name,"publishReading_$reading",undef)) {
main::CommandAttr($hash,"$name publishReading_$reading $base$reading"); CommandAttr($hash,"$name publishReading_$reading $base$reading");
} }
}; };
last; last;
@ -82,28 +98,28 @@ sub MQTT_BRIDGE_Get($$@) {
}; };
} }
sub MQTT_BRIDGE_Notify() { sub Notify() {
my ($hash,$dev) = @_; my ($hash,$dev) = @_;
main::Log3($hash->{NAME},5,"Notify for $dev->{NAME}"); Log3($hash->{NAME},5,"Notify for $dev->{NAME}");
foreach my $event (@{$dev->{CHANGED}}) { foreach my $event (@{$dev->{CHANGED}}) {
$event =~ /^([^:]+)(: )?(.*)$/; $event =~ /^([^:]+)(: )?(.*)$/;
main::Log3($hash->{NAME},5,"$event, '".((defined $1) ? $1 : "-undef-")."', '".((defined $3) ? $3 : "-undef-")."'"); Log3($hash->{NAME},5,"$event, '".((defined $1) ? $1 : "-undef-")."', '".((defined $3) ? $3 : "-undef-")."'");
if (defined $3 and $3 ne "") { if (defined $3 and $3 ne "") {
if (defined $hash->{publishReadings}->{$1}) { if (defined $hash->{publishReadings}->{$1}) {
MQTT_send_publish($hash->{IODev}, topic => $hash->{publishReadings}->{$1}, message => $3, qos => $hash->{qos}); send_publish($hash->{IODev}, topic => $hash->{publishReadings}->{$1}, message => $3, qos => $hash->{qos});
readingsSingleUpdate($hash,"transmission-state","publish sent",1); readingsSingleUpdate($hash,"transmission-state","publish sent",1);
} }
} else { } else {
if (defined $hash->{publishState}) { if (defined $hash->{publishState}) {
MQTT_send_publish($hash->{IODev}, topic => $hash->{publishState}, message => $1, qos => $hash->{qos}); send_publish($hash->{IODev}, topic => $hash->{publishState}, message => $1, qos => $hash->{qos});
readingsSingleUpdate($hash,"transmission-state","publish sent",1); readingsSingleUpdate($hash,"transmission-state","publish sent",1);
} }
} }
} }
} }
sub MQTT_BRIDGE_Attr($$$$) { sub Attr($$$$) {
my ($command,$name,$attribute,$value) = @_; my ($command,$name,$attribute,$value) = @_;
my $hash = $main::defs{$name}; my $hash = $main::defs{$name};
@ -114,7 +130,7 @@ sub MQTT_BRIDGE_Attr($$$$) {
push @{$hash->{subscribe}},$value unless grep {$_ eq $value} @{$hash->{subscribe}}; push @{$hash->{subscribe}},$value unless grep {$_ eq $value} @{$hash->{subscribe}};
if ($main::init_done) { if ($main::init_done) {
if (my $mqtt = $hash->{IODev}) {; if (my $mqtt = $hash->{IODev}) {;
my $msgid = MQTT_send_subscribe($mqtt, my $msgid = send_subscribe($mqtt,
topics => [[$value => $hash->{qos} || MQTT_QOS_AT_MOST_ONCE]], topics => [[$value => $hash->{qos} || MQTT_QOS_AT_MOST_ONCE]],
); );
$hash->{message_ids}->{$msgid}++; $hash->{message_ids}->{$msgid}++;
@ -128,7 +144,7 @@ sub MQTT_BRIDGE_Attr($$$$) {
$hash->{subscribe} = [grep { $_ != $topic } @{$hash->{subscribe}}]; $hash->{subscribe} = [grep { $_ != $topic } @{$hash->{subscribe}}];
if ($main::init_done) { if ($main::init_done) {
if (my $mqtt = $hash->{IODev}) {; if (my $mqtt = $hash->{IODev}) {;
my $msgid = MQTT_send_unsubscribe($mqtt, my $msgid = send_unsubscribe($mqtt,
topics => [$topic], topics => [$topic],
); );
$hash->{message_ids}->{$msgid}++; $hash->{message_ids}->{$msgid}++;
@ -156,33 +172,20 @@ sub MQTT_BRIDGE_Attr($$$$) {
} }
last; last;
}; };
$attribute eq "qos" and do { client_attr($hash,$command,$name,$attribute,$value);
if ($command eq "set") {
$hash->{qos} = $qos{$value};
} else {
$hash->{qos} = MQTT_QOS_AT_MOST_ONCE;
}
last;
};
$attribute eq "IODev" and do {
if ($command eq "set") {
} else {
}
last;
};
} }
} }
sub MQTT_BRIDGE_onmessage($$$) { sub onmessage($$$) {
my ($hash,$topic,$message) = @_; my ($hash,$topic,$message) = @_;
if (defined (my $command = $hash->{subscribeSets}->{$topic})) { if (defined (my $command = $hash->{subscribeSets}->{$topic})) {
my @args = split ("[ \t]+",$message); my @args = split ("[ \t]+",$message);
if ($command eq "") { if ($command eq "") {
main::Log3($hash->{NAME},5,"calling DoSet($hash->{DEF}".(@args ? ",".join(",",@args) : "")); Log3($hash->{NAME},5,"calling DoSet($hash->{DEF}".(@args ? ",".join(",",@args) : ""));
main::DoSet($hash->{DEF},@args); DoSet($hash->{DEF},@args);
} else { } else {
main::Log3($hash->{NAME},5,"calling DoSet($hash->{DEF},$command".(@args ? ",".join(",",@args) : "")); Log3($hash->{NAME},5,"calling DoSet($hash->{DEF},$command".(@args ? ",".join(",",@args) : ""));
main::DoSet($hash->{DEF},$command,@args); DoSet($hash->{DEF},$command,@args);
} }
} }
} }

View File

@ -25,10 +25,6 @@
use strict; use strict;
use warnings; use warnings;
use GPUtils qw(:all);
use Net::MQTT::Constants;
use Net::MQTT::Message;
my %sets = ( my %sets = (
); );
@ -37,28 +33,45 @@ my %gets = (
"version" => "", "version" => "",
); );
my %qos = map {qos_string($_) => $_} (MQTT_QOS_AT_MOST_ONCE,MQTT_QOS_AT_LEAST_ONCE,MQTT_QOS_EXACTLY_ONCE);
sub MQTT_DEVICE_Initialize($) { sub MQTT_DEVICE_Initialize($) {
my $hash = shift @_; my $hash = shift @_;
# Consumer # Consumer
$hash->{DefFn} = "MQTT_client_define"; $hash->{DefFn} = "MQTT::Client_Define";
$hash->{UndefFn} = "MQTT_client_undefine"; $hash->{UndefFn} = "MQTT::Client_Undefine";
$hash->{SetFn} = "MQTT_DEVICE_Set"; $hash->{SetFn} = "MQTT::DEVICE::Set";
$hash->{AttrFn} = "MQTT_DEVICE_Attr"; $hash->{AttrFn} = "MQTT::DEVICE::Attr";
$hash->{AttrList} = $hash->{AttrList} =
"IODev ". "IODev ".
"qos:".join(",",keys %qos)." ". "qos:".join(",",keys %MQTT::qos)." ".
"publishSet ". "publishSet ".
"publishSet_.* ". "publishSet_.* ".
"subscribeReading_.* ". "subscribeReading_.* ".
$main::readingFnAttributes; $main::readingFnAttributes;
main::LoadModule("MQTT");
} }
sub MQTT_DEVICE_Set($@) { package MQTT::DEVICE;
use strict;
use warnings;
use GPUtils qw(:all);
use Net::MQTT::Constants;
BEGIN {
MQTT->import(qw(:all));
GP_Import(qw(
readingsSingleUpdate
Log3
))
};
sub Set($@) {
my ($hash, @a) = @_; my ($hash, @a) = @_;
return "Need at least one parameters" if(@a < 2); return "Need at least one parameters" if(@a < 2);
return "Unknown argument $a[1], choose one of " . join(" ", map {$sets{$_} eq "" ? $_ : "$_:$sets{$_}"} sort keys %sets) return "Unknown argument $a[1], choose one of " . join(" ", map {$sets{$_} eq "" ? $_ : "$_:$sets{$_}"} sort keys %sets)
@ -66,16 +79,16 @@ sub MQTT_DEVICE_Set($@) {
my $command = $a[1]; my $command = $a[1];
my $value = $a[2]; my $value = $a[2];
if (defined $value) { if (defined $value) {
MQTT_send_publish($hash->{IODev}, topic => $hash->{publishSets}->{$command}->{topic}, message => $value, qos => $hash->{qos}); send_publish($hash->{IODev}, topic => $hash->{publishSets}->{$command}->{topic}, message => $value, qos => $hash->{qos});
readingsSingleUpdate($hash,$command,$value,1); readingsSingleUpdate($hash,$command,$value,1);
} else { } else {
MQTT_send_publish($hash->{IODev}, topic => $hash->{publishSets}->{""}->{topic}, message => $command, qos => $hash->{qos}); send_publish($hash->{IODev}, topic => $hash->{publishSets}->{""}->{topic}, message => $command, qos => $hash->{qos});
readingsSingleUpdate($hash,"state",$command,1); readingsSingleUpdate($hash,"state",$command,1);
} }
return undef; return undef;
} }
sub MQTT_DEVICE_Attr($$$$) { sub Attr($$$$) {
my ($command,$name,$attribute,$value) = @_; my ($command,$name,$attribute,$value) = @_;
my $hash = $main::defs{$name}; my $hash = $main::defs{$name};
@ -86,7 +99,7 @@ sub MQTT_DEVICE_Attr($$$$) {
push @{$hash->{subscribe}},$value unless grep {$_ eq $value} @{$hash->{subscribe}}; push @{$hash->{subscribe}},$value unless grep {$_ eq $value} @{$hash->{subscribe}};
if ($main::init_done) { if ($main::init_done) {
if (my $mqtt = $hash->{IODev}) {; if (my $mqtt = $hash->{IODev}) {;
my $msgid = MQTT_send_subscribe($mqtt, my $msgid = send_subscribe($mqtt,
topics => [[$value => $hash->{qos} || MQTT_QOS_AT_MOST_ONCE]], topics => [[$value => $hash->{qos} || MQTT_QOS_AT_MOST_ONCE]],
); );
$hash->{message_ids}->{$msgid}++; $hash->{message_ids}->{$msgid}++;
@ -100,7 +113,7 @@ sub MQTT_DEVICE_Attr($$$$) {
delete $hash->{subscribeReadings}->{$topic}; delete $hash->{subscribeReadings}->{$topic};
if ($main::init_done) { if ($main::init_done) {
if (my $mqtt = $hash->{IODev}) {; if (my $mqtt = $hash->{IODev}) {;
my $msgid = MQTT_send_unsubscribe($mqtt, my $msgid = send_unsubscribe($mqtt,
topics => [$topic], topics => [$topic],
); );
$hash->{message_ids}->{$msgid}++; $hash->{message_ids}->{$msgid}++;
@ -139,28 +152,15 @@ sub MQTT_DEVICE_Attr($$$$) {
} }
last; last;
}; };
$attribute eq "qos" and do { client_attr($hash,$command,$name,$attribute,$value);
if ($command eq "set") {
$hash->{qos} = $qos{$value};
} else {
$hash->{qos} = MQTT_QOS_AT_MOST_ONCE;
}
last;
};
$attribute eq "IODev" and do {
if ($command eq "set") {
} else {
}
last;
};
} }
} }
sub MQTT_DEVICE_onmessage($$$) { sub onmessage($$$) {
my ($hash,$topic,$message) = @_; my ($hash,$topic,$message) = @_;
if (defined (my $reading = $hash->{subscribeReadings}->{$topic})) { if (defined (my $reading = $hash->{subscribeReadings}->{$topic})) {
main::Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$reading,$message,1"); Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$reading,$message,1");
main::readingsSingleUpdate($hash,$reading,$message,1); readingsSingleUpdate($hash,$reading,$message,1);
} }
} }

View File

@ -7,7 +7,7 @@ use Exporter qw( import );
use strict; use strict;
use warnings; use warnings;
our %EXPORT_TAGS = (all => [qw(GP_Define GP_Catch GP_ForallClients)]); our %EXPORT_TAGS = (all => [qw(GP_Define GP_Catch GP_ForallClients GP_Import)]);
Exporter::export_ok_tags('all'); Exporter::export_ok_tags('all');
#add FHEM/lib to @INC if it's not allready included. Should rather be in fhem.pl than here though... #add FHEM/lib to @INC if it's not allready included. Should rather be in fhem.pl than here though...
@ -54,5 +54,14 @@ sub GP_ForallClients($$@)
return undef; return undef;
} }
sub GP_Import(@)
{
no strict qw/refs/; ## no critic
my $pkg = caller(0);
foreach (@_) {
*{$pkg.'::'.$_} = *{'main::'.$_};
}
}
1; 1;