MQTT_DEVICE: new attribute 'autoSubscribeReadings'

git-svn-id: https://svn.fhem.de/fhem/trunk@6708 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
ntruchsess 2014-10-08 13:07:07 +00:00
parent 4c47844ad9
commit 4ccf88c21e
3 changed files with 93 additions and 39 deletions

View File

@ -61,7 +61,7 @@ package MQTT;
use Exporter ('import'); use Exporter ('import');
@EXPORT = (); @EXPORT = ();
@EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr); @EXPORT_OK = qw(send_publish send_subscribe send_unsubscribe client_attr client_subscribe_topic client_unsubscribe_topic topic_to_regexp);
%EXPORT_TAGS = (all => [@EXPORT_OK]); %EXPORT_TAGS = (all => [@EXPORT_OK]);
use strict; use strict;
@ -195,7 +195,7 @@ sub Read {
GP_ForallClients($hash,sub { GP_ForallClients($hash,sub {
my $client = shift; my $client = shift;
Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message()); Log3($client->{NAME},5,"publish received for $topic, ".$mqtt->message());
if (grep { $_ eq $topic } @{$client->{subscribe}}) { if (grep { $topic =~ $_ } @{$client->{subscribeExpr}}) {
readingsSingleUpdate($client,"transmission-state","publish received",1); readingsSingleUpdate($client,"transmission-state","publish received",1);
if ($client->{TYPE} eq "MQTT_DEVICE") { if ($client->{TYPE} eq "MQTT_DEVICE") {
MQTT::DEVICE::onmessage($client,$topic,$mqtt->message()); MQTT::DEVICE::onmessage($client,$topic,$mqtt->message());
@ -327,12 +327,55 @@ sub send_message($$$@) {
return $msgid; return $msgid;
}; };
sub topic_to_regexp($) {
my $t = shift;
$t =~ s|#$|.\*|;
$t =~ s|\/\.\*$|.\*|;
$t =~ s|\/|\\\/|g;
$t =~ s|(\+)([^+]*$)|(+)$2|;
$t =~ s|\+|[^\/]+|g;
return "^$t\$";
}
sub client_subscribe_topic($$) {
my ($client,$topic) = @_;
push @{$client->{subscribe}},$topic unless grep {$_ eq $topic} @{$client->{subscribe}};
my $expr = topic_to_regexp($topic);
push @{$client->{subscribeExpr}},$expr unless grep {$_ eq $expr} @{$client->{subscribeExpr}};
if ($main::init_done) {
if (my $mqtt = $client->{IODev}) {;
my $msgid = send_subscribe($mqtt,
topics => [[$topic => $client->{qos} || MQTT_QOS_AT_MOST_ONCE]],
);
$client->{message_ids}->{$msgid}++;
readingsSingleUpdate($client,"transmission-state","subscribe sent",1)
}
}
};
sub client_unsubscribe_topic($$) {
my ($client,$topic) = @_;
$client->{subscribe} = [grep { $_ ne $topic } @{$client->{subscribe}}];
my $expr = topic_to_regexp($topic);
$client->{subscribeExpr} = [grep { $_ ne $expr} @{$client->{subscribeExpr}}];
if ($main::init_done) {
if (my $mqtt = $client->{IODev}) {;
my $msgid = send_unsubscribe($mqtt,
topics => [$topic],
);
$client->{message_ids}->{$msgid}++;
readingsSingleUpdate($client,"transmission-state","unsubscribe sent",1)
}
}
};
sub Client_Define($$) { sub Client_Define($$) {
my ( $client, $def ) = @_; my ( $client, $def ) = @_;
$client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF}; $client->{NOTIFYDEV} = $client->{DEF} if $client->{DEF};
$client->{qos} = MQTT_QOS_AT_MOST_ONCE; $client->{qos} = MQTT_QOS_AT_MOST_ONCE;
$client->{subscribe} = []; $client->{subscribe} = [];
$client->{subscribeExpr} = [];
if ($main::init_done) { if ($main::init_done) {
return client_start($client); return client_start($client);

View File

@ -126,30 +126,17 @@ sub Attr($$$$) {
ATTRIBUTE_HANDLER: { ATTRIBUTE_HANDLER: {
$attribute =~ /^subscribeSet(_?)(.*)/ and do { $attribute =~ /^subscribeSet(_?)(.*)/ and do {
if ($command eq "set") { if ($command eq "set") {
$hash->{subscribeSets}->{$value} = $2; unless (defined $hash->{subscribeSets}->{$value} and $hash->{subscribeSets}->{$value} eq $2) {
push @{$hash->{subscribe}},$value unless grep {$_ eq $value} @{$hash->{subscribe}}; unless (defined $hash->{subscribeSets}->{$value}) {
if ($main::init_done) { client_subscribe_topic($hash,$value);
if (my $mqtt = $hash->{IODev}) {;
my $msgid = send_subscribe($mqtt,
topics => [[$value => $hash->{qos} || MQTT_QOS_AT_MOST_ONCE]],
);
$hash->{message_ids}->{$msgid}++;
readingsSingleUpdate($hash,"transmission-state","subscribe sent",1)
} }
$hash->{subscribeSets}->{$value} = $2;
} }
} else { } else {
foreach my $topic (keys %{$hash->{subscribeSets}}) { foreach my $topic (keys %{$hash->{subscribeSets}}) {
if ($hash->{subscribeSets}->{topic} eq $2) { if ($hash->{subscribeSets}->{$topic} eq $2) {
client_unsubscribe_topic($hash,$topic);
delete $hash->{subscribeSets}->{$topic}; delete $hash->{subscribeSets}->{$topic};
$hash->{subscribe} = [grep { $_ ne $topic } @{$hash->{subscribe}}];
if ($main::init_done) {
if (my $mqtt = $hash->{IODev}) {;
my $msgid = send_unsubscribe($mqtt,
topics => [$topic],
);
$hash->{message_ids}->{$msgid}++;
}
}
last; last;
} }
} }

View File

@ -49,6 +49,7 @@ sub MQTT_DEVICE_Initialize($) {
"publishSet ". "publishSet ".
"publishSet_.* ". "publishSet_.* ".
"subscribeReading_.* ". "subscribeReading_.* ".
"autoSubscribeReadings ".
$main::readingFnAttributes; $main::readingFnAttributes;
main::LoadModule("MQTT"); main::LoadModule("MQTT");
@ -66,6 +67,8 @@ BEGIN {
MQTT->import(qw(:all)); MQTT->import(qw(:all));
GP_Import(qw( GP_Import(qw(
CommandDeleteReading
CommandAttr
readingsSingleUpdate readingsSingleUpdate
Log3 Log3
)) ))
@ -95,36 +98,43 @@ sub Attr($$$$) {
ATTRIBUTE_HANDLER: { ATTRIBUTE_HANDLER: {
$attribute =~ /^subscribeReading_(.+)/ and do { $attribute =~ /^subscribeReading_(.+)/ and do {
if ($command eq "set") { if ($command eq "set") {
$hash->{subscribeReadings}->{$value} = $1; unless (defined $hash->{subscribeReadings}->{$value} and $hash->{subscribeReadings}->{$value} eq $1) {
push @{$hash->{subscribe}},$value unless grep {$_ eq $value} @{$hash->{subscribe}}; unless (defined $hash->{subscribeReadings}->{$value}) {
if ($main::init_done) { client_subscribe_topic($hash,$value);
if (my $mqtt = $hash->{IODev}) {;
my $msgid = send_subscribe($mqtt,
topics => [[$value => $hash->{qos} || MQTT_QOS_AT_MOST_ONCE]],
);
$hash->{message_ids}->{$msgid}++;
readingsSingleUpdate($hash,"transmission-state","subscribe sent",1)
} }
$hash->{subscribeReadings}->{$value} = $1;
} }
} else { } else {
foreach my $topic (keys %{$hash->{subscribeReadings}}) { foreach my $topic (keys %{$hash->{subscribeReadings}}) {
if ($hash->{subscribeReadings}->{$topic} eq $1) { if ($hash->{subscribeReadings}->{$topic} eq $1) {
$hash->{subscribe} = [grep { $_ ne $topic } @{$hash->{subscribe}}]; client_unsubscribe_topic($hash,$topic);
delete $hash->{subscribeReadings}->{$topic}; delete $hash->{subscribeReadings}->{$topic};
if ($main::init_done) { CommandDeleteReading(undef,"$hash->{NAME} $1");
if (my $mqtt = $hash->{IODev}) {;
my $msgid = send_unsubscribe($mqtt,
topics => [$topic],
);
$hash->{message_ids}->{$msgid}++;
}
}
last; last;
} }
} }
} }
last; last;
}; };
$attribute eq "autoSubscribeReadings" and do {
if ($command eq "set") {
unless (defined $hash->{'.autoSubscribeTopic'} and $hash->{'.autoSubscribeTopic'} eq $value) {
if (defined $hash->{'.autoSubscribeTopic'}) {
client_unsubscribe_topic($hash,$hash->{'.autoSubscribeTopic'});
}
$hash->{'.autoSubscribeTopic'} = $value;
$hash->{'.autoSubscribeExpr'} = topic_to_regexp($value);
client_subscribe_topic($hash,$value);
}
} else {
if (defined $hash->{'.autoSubscribeTopic'}) {
client_unsubscribe_topic($hash,$hash->{'.autoSubscribeTopic'});
delete $hash->{'.autoSubscribeTopic'};
delete $hash->{'.autoSubscribeExpr'};
}
}
last;
};
$attribute =~ /^publishSet(_?)(.*)/ and do { $attribute =~ /^publishSet(_?)(.*)/ and do {
if ($command eq "set") { if ($command eq "set") {
my @values = split ("[ \t]+",$value); my @values = split ("[ \t]+",$value);
@ -146,6 +156,7 @@ sub Attr($$$$) {
delete $sets{$set}; delete $sets{$set};
} }
} else { } else {
CommandDeleteReading(undef,"$hash->{NAME} $2");
delete $sets{$2}; delete $sets{$2};
} }
delete $hash->{publishSets}->{$2}; delete $hash->{publishSets}->{$2};
@ -161,6 +172,10 @@ sub onmessage($$$) {
if (defined (my $reading = $hash->{subscribeReadings}->{$topic})) { if (defined (my $reading = $hash->{subscribeReadings}->{$topic})) {
Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$reading,$message,1"); Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$reading,$message,1");
readingsSingleUpdate($hash,$reading,$message,1); readingsSingleUpdate($hash,$reading,$message,1);
} elsif ($topic =~ $hash->{'.autoSubscribeExpr'}) {
Log3($hash->{NAME},5,"calling readingsSingleUpdate($hash->{NAME},$1,$message,1");
CommandAttr(undef,"$hash->{NAME} subscribeReading_$1 $topic");
readingsSingleUpdate($hash,$1,$message,1);
} }
} }
@ -210,6 +225,15 @@ sub onmessage($$$) {
<code>attr &lt;name&gt; publishSet_&lt;reading&gt; [&lt;values&gt] &lt;topic&gt;</code><br> <code>attr &lt;name&gt; publishSet_&lt;reading&gt; [&lt;values&gt] &lt;topic&gt;</code><br>
configures reading that may be used to both set 'reading' (to optionally configured values) and publish to configured topic<br> configures reading that may be used to both set 'reading' (to optionally configured values) and publish to configured topic<br>
</li> </li>
<li>
<code>attr &lt;name&gt; autoSubscribeReadings &lt;topic&gt;</code><br>
specify a mqtt-topic pattern with wildcard (e.c. 'myhouse/kitchen/+') and MQTT_DEVICE automagically creates readings based on the wildcard-match<br>
e.g a message received with topic 'myhouse/kitchen/temperature' would create and update a reading 'temperature'
</li>
<li>
<code>attr &lt;name&gt; subscribeReading_&lt;reading&gt; &lt;topic&gt;</code><br>
mapps a reading to a specific topic. The reading is updated whenever a message to the configured topic arrives<br>
</li>
</ul> </ul>
</ul> </ul>
</ul> </ul>