Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT subscription #361

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -616,4 +616,4 @@ RUBY VERSION
ruby 3.0.6p216

BUNDLED WITH
2.5.6
2.5.21
13 changes: 11 additions & 2 deletions app/models/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@ class Component < ActiveRecord::Base
belongs_to :sensor

validates_presence_of :device, :sensor
validates :sensor_id, :uniqueness => { :scope => [:device_id] }
validates :key, :uniqueness => { :scope => [:device_id] }

# IMPORTANT: Validation of sensor/device uniqueness is done at the database level,
# as this allows us to use the create_or_find_by! method to atomically upsert components
# in the mqtt-task, avoiding component duplication due to race conditions.
# For some reason, create_or_find_by! ONLY works when the database constraint is
# the ONLY uniqueness constraint on those two values, so adding a rails validation here
# causes an error. Leaving the validations here commented out by way of documentation.
# See https://stackoverflow.com/questions/74566974/create-or-find-by-not-working-as-it-should-in-rails-6
# validates :sensor_id, :uniqueness => { :scope => [:device_id] }
# validates :key, :uniqueness => { :scope => [:device_id] }


before_validation :set_key, on: :create

Expand Down
2 changes: 1 addition & 1 deletion app/models/concerns/data_parser/storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def timestamp_parse(timestamp)
end

def sensor_reading(device, sensor)
component = device.find_or_create_component_for_sensor_reading(sensor)
component = device.create_or_find_component_for_sensor_reading(sensor)
return nil if component.nil?
value = component.normalized_value( (Float(sensor['value']) rescue sensor['value']) )
{
Expand Down
19 changes: 11 additions & 8 deletions app/models/device.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,25 +103,25 @@ def sensor_map
components.map { |c| [c.key, c.sensor.id]}.to_h
end

def find_or_create_component_by_sensor_id(sensor_id)
def create_or_find_component_by_sensor_id(sensor_id)
return nil if sensor_id.nil? || !Sensor.exists?(id: sensor_id)
components.find_or_create_by(sensor_id: sensor_id)
components.create_or_find_by!(sensor_id: sensor_id)
end

def find_or_create_component_by_sensor_key(sensor_key)
def create_or_find_component_by_sensor_key(sensor_key)
return nil if sensor_key.nil?
sensor = Sensor.find_by(default_key: sensor_key)
return nil if sensor.nil?
components.find_or_create_by(sensor_id: sensor.id)
components.create_or_find_by!(sensor_id: sensor.id)
end

def find_or_create_component_for_sensor_reading(reading)
def create_or_find_component_for_sensor_reading(reading)
key_or_id = reading["id"]
if key_or_id.is_a?(Integer) || key_or_id =~ /\d+/
# It's an integer and therefore a sensor id
find_or_create_component_by_sensor_id(key_or_id)
create_or_find_component_by_sensor_id(key_or_id)
else
find_or_create_component_by_sensor_key(key_or_id)
create_or_find_component_by_sensor_key(key_or_id)
end
end

Expand Down Expand Up @@ -245,7 +245,10 @@ def remove_mac_address_for_newly_registered_device!

def update_component_timestamps(timestamp, sensor_ids)
components.select {|c| sensor_ids.include?(c.sensor_id) }.each do |component|
component.update_column(:last_reading_at, timestamp)
component.lock! if self.class.connection.transaction_open?
if !component.reload.last_reading_at || timestamp > component.last_reading_at
component.update_column(:last_reading_at, timestamp)
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion app/models/raw_storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def store data, mac, version, ip, raise_errors=false

metric_id = device.find_sensor_id_by_key(metric)

component = device.find_or_create_component_by_sensor_id(metric_id)
component = device.create_or_find_component_by_sensor_id(metric_id)
next if component.nil?

value = component.normalized_value( (Float(value) rescue value) )
Expand Down
22 changes: 12 additions & 10 deletions app/models/storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ def store device, reading, do_update = true

def update_device(device, parsed_ts, sql_data)
return if parsed_ts <= Time.at(0)
device.transaction do
device.lock!
if device.reload.last_reading_at.present?
# Comparison errors if device.last_reading_at is nil (new devices).
# Devices can post multiple readings, in a non-sorted order.
# Do not update data with an older timestamp.
return if parsed_ts < device.last_reading_at
end

if device.last_reading_at.present?
# Comparison errors if device.last_reading_at is nil (new devices).
# Devices can post multiple readings, in a non-sorted order.
# Do not update data with an older timestamp.
return if parsed_ts < device.last_reading_at
sql_data = device.data.present? ? device.data.merge(sql_data) : sql_data
device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published')
sensor_ids = sql_data.select { |k, v| k.is_a?(Integer) }.keys.compact.uniq
device.update_component_timestamps(parsed_ts, sensor_ids)
end

sql_data = device.data.present? ? device.data.merge(sql_data) : sql_data
device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published')
sensor_ids = sql_data.select { |k, v| k.is_a?(Integer) }.keys.compact.uniq
device.update_component_timestamps(parsed_ts, sensor_ids)
end

def kairos_publish(reading_data)
Expand Down
6 changes: 5 additions & 1 deletion compose.override.local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ services:
restart: "no"
sidekiq:
restart: "no"
mqtt-task:
mqtt-task-main-1:
restart: "no"
mqtt-task-main-2:
restart: "no"
mqtt-task-secondary:
restart: "no"
telnet-task:
restart: "no"
Expand Down
4 changes: 3 additions & 1 deletion compose/app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ services:
- auth
- redis
- sidekiq
- mqtt-task
- mqtt-task-main-1
- mqtt-task-main-2
- mqtt-task-secondary
- telnet-task
#- mqtt
restart: always
Expand Down
1 change: 1 addition & 0 deletions compose/db.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
services:
db:
image: postgres:10
command: -c max_connections=200
volumes:
- sck-postgres:/var/lib/postgresql/data
env_file: ../.env
Expand Down
14 changes: 14 additions & 0 deletions compose/mqtt-task-common.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
services:
mqtt-task:
build: ../
env_file: ../.env
command: ./mqtt_subscriber.sh
restart: always
volumes:
- "../log:/app/log"
logging:
driver: "json-file"
options:
max-size: "100m"
environment:
db_pool_size: 5
39 changes: 26 additions & 13 deletions compose/mqtt-task.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
services:
mqtt-task:
build: ../
env_file: ../.env
command: bundle exec rake mqtt:sub
restart: always
volumes:
- "../log:/app/log"
logging:
driver: "json-file"
options:
max-size: "100m"
environment:
db_pool_size: 5
mqtt-task-main-1:
extends:
file: mqtt-task-common.yml
service: mqtt-task
environment:
MQTT_CLIENT_ID: smartcitizen-api-staging-main-1
MQTT_CLEAN_SESSION: false
mqtt-task-main-2:
extends:
file: mqtt-task-common.yml
service: mqtt-task
environment:
MQTT_CLIENT_ID: smartcitizen-api-staging-main-2
MQTT_CLEAN_SESSION: false
mqtt-task-secondary:
extends:
file: mqtt-task-common.yml
service: mqtt-task
environment:
MQTT_CLIENT_ID: "smartcitizen-api-staging-secondary"
MQTT_CLEAN_SESSION: true
deploy:
mode: replicated
replicas: 2


2 changes: 1 addition & 1 deletion compose/telnet-task.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ services:
command: bundle exec rake telnet:push
restart: always
environment:
db_pool_size: 5
db_pool_size: 2
23 changes: 23 additions & 0 deletions db/migrate/20241009174732_unique_index_on_components.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
class UniqueIndexOnComponents < ActiveRecord::Migration[6.1]
def up
remove_index :components, [:device_id, :sensor_id]
add_index :components, [:device_id, :sensor_id], unique: true
execute %{
ALTER TABLE components ADD CONSTRAINT unique_sensor_for_device UNIQUE (device_id, sensor_id)
}
execute %{
ALTER TABLE components ADD CONSTRAINT unique_key_for_device UNIQUE (device_id, key)
}
end

def down
execute %{
ALTER TABLE components DROP CONSTRAINT IF EXISTS unique_key_for_device
}
execute %{
ALTER TABLE components DROP CONSTRAINT IF EXISTS unique_sensor_for_device
}
remove_index :components, [:device_id, :sensor_id], unique: true
add_index :components, [:device_id, :sensor_id]
end
end
6 changes: 4 additions & 2 deletions db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 2024_08_12_081108) do
ActiveRecord::Schema.define(version: 2024_10_09_174732) do
# These are extensions that must be enabled in order to support this database
enable_extension "adminpack"
enable_extension "hstore"
Expand Down Expand Up @@ -65,7 +65,9 @@
t.string "key"
t.integer "bus", default: 1, null: false
t.datetime "last_reading_at"
t.index ["device_id", "sensor_id"], name: "index_components_on_device_id_and_sensor_id"
t.index ["device_id", "key"], name: "unique_key_for_device", unique: true
t.index ["device_id", "sensor_id"], name: "index_components_on_device_id_and_sensor_id", unique: true
t.index ["device_id", "sensor_id"], name: "unique_sensor_for_device", unique: true
end

create_table "devices", id: :serial, force: :cascade do |t|
Expand Down
1 change: 1 addition & 0 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ REDIS_STORE=redis://redis:6379/3

# MQTT Settings
MQTT_HOST=mqtt
#MQTT_SHARED_SUBSCRIPTION_GROUP="group1"
#MQTT_CLEAN_SESSION=true
#MQTT_CLIENT_ID=some_id
#MQTT_PORT=port
Expand Down
32 changes: 24 additions & 8 deletions lib/tasks/mqtt_subscriber.rake
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,23 @@ namespace :mqtt do
task sub: :environment do
pid_file = Rails.root.join('tmp/pids/mqtt_subscriber.pid')
File.open(pid_file, 'w') { |f| f.puts Process.pid }
mqtt_log = Logger.new('log/mqtt.log', 5, 100.megabytes)

mqtt_clean_session = ENV.has_key?('MQTT_CLEAN_SESSION') ? ENV['MQTT_CLEAN_SESSION'] == "true" : true
mqtt_client_id = ENV.has_key?('MQTT_CLIENT_ID') ? ENV['MQTT_CLIENT_ID'] : nil
mqtt_host = ENV.has_key?('MQTT_HOST') ? ENV['MQTT_HOST'] : 'mqtt'
mqtt_port = ENV.has_key?('MQTT_PORT') ? ENV['MQTT_PORT'] : 1883
mqtt_ssl = ENV.has_key?('MQTT_SSL') ? ENV['MQTT_SSL'] : false
mqtt_shared_subscription_group = ENV.fetch("MQTT_SHARED_SUBSCRIPTION_GROUP", nil)
mqtt_queue_length_warning_threshold = ENV.fetch("MQTT_QUEUE_LENGTH_WARNING_THRESHOLD", "30").to_i

mqtt_topics_string = ENV.fetch('MQTT_TOPICS', '')
mqtt_topics = mqtt_topics_string.include?(",") ? mqtt_topics_string.split(",") : [ mqtt_topics_string ]

if mqtt_shared_subscription_group && mqtt_clean_session
mqtt_client_id += "-#{ENV.fetch("HOSTNAME")}"
end

mqtt_log = Logger.new("log/mqtt-#{mqtt_client_id}.log", 5, 100.megabytes)
mqtt_log.info('MQTT TASK STARTING')
mqtt_log.info("clean_session: #{mqtt_clean_session}")
mqtt_log.info("client_id: #{mqtt_client_id}")
Expand All @@ -32,18 +40,18 @@ namespace :mqtt do
mqtt_log.info "Using clean_session setting: #{client.clean_session}"

message_handler = MqttMessagesHandler.new

prefix = mqtt_shared_subscription_group ? "$share/#{mqtt_shared_subscription_group}" : "$queue"
client.subscribe(*mqtt_topics.flat_map { |topic|
topic = topic == "" ? topic : topic + "/"
[
"$queue/#{topic}device/sck/+/readings" => 2,
"$queue/#{topic}device/sck/+/readings/raw" => 2,
"$queue/#{topic}device/sck/+/hello" => 2,
"$queue/#{topic}device/sck/+/info" => 2,
"$queue/#{topic}device/inventory" => 2
"#{prefix}/#{topic}device/sck/+/readings" => 2,
"#{prefix}/#{topic}device/sck/+/readings/raw" => 2,
"#{prefix}/#{topic}device/sck/+/hello" => 2,
"#{prefix}/#{topic}device/sck/+/info" => 2,
"#{prefix}/#{topic}device/inventory" => 2
]
})

threshold_passed = false
client.get do |topic, message|
Sentry.with_scope do
begin
Expand All @@ -52,6 +60,14 @@ namespace :mqtt do
end
mqtt_log.info "Processed MQTT message in #{time}"
mqtt_log.info "MQTT queue length: #{client.queue_length}"
if client.queue_length >= mqtt_queue_length_warning_threshold
if !threshold_passed
Sentry.capture_message("Warning: Internal MQTT queue length is #{client.queue_length} (>= #{mqtt_queue_length_warning_threshold} on client #{mqtt_client_id}).")
threshold_passed = true
end
else
threshold_passed = false
end
rescue Exception => e
mqtt_log.info e
Sentry.capture_exception(e)
Expand Down
2 changes: 2 additions & 0 deletions mqtt_subscriber.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
bundle exec rake mqtt:sub
2 changes: 1 addition & 1 deletion spec/models/component_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

it "validates uniqueness of board to sensor" do
component = create(:component, device: create(:device), sensor: create(:sensor))
expect{ create(:component, device: component.device, sensor: component.sensor) }.to raise_error(ActiveRecord::RecordInvalid)
expect{ create(:component, device: component.device, sensor: component.sensor) }.to raise_error(ActiveRecord::RecordNotUnique)
end

describe "creating a unique sensor key" do
Expand Down
10 changes: 5 additions & 5 deletions spec/models/device_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -635,19 +635,19 @@
end
end

describe "#find_or_create_component_by_sensor_id" do
describe "#create_or_find_component_by_sensor_id" do
context "when the sensor exists and a component already exists for this device" do
it "returns the existing component" do
sensor = create(:sensor)
component = create(:component, sensor: sensor, device: device)
expect(device.find_or_create_component_by_sensor_id(sensor.id)).to eq(component)
expect(device.create_or_find_component_by_sensor_id(sensor.id)).to eq(component)
end
end

context "when the sensor exists and a component does not already exist for this device" do
it "returns a new valid component with the correct sensor and device" do
sensor = create(:sensor)
component = device.find_or_create_component_by_sensor_id(sensor.id)
component = device.create_or_find_component_by_sensor_id(sensor.id)
expect(component).not_to be_blank
expect(component).to be_a Component
expect(component.valid?).to be(true)
Expand All @@ -660,13 +660,13 @@
context "when no sensor exists with this id" do
it "returns nil" do
create(:sensor, id: 12345)
expect(device.find_or_create_component_by_sensor_id(54321)).to be_blank
expect(device.create_or_find_component_by_sensor_id(54321)).to be_blank
end
end

context "when the id is nil" do
it "returns nil" do
expect(device.find_or_create_component_by_sensor_id(nil)).to be_blank
expect(device.create_or_find_component_by_sensor_id(nil)).to be_blank
end
end

Expand Down
1 change: 0 additions & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# users commonly want.
#
# See http://rubydoc.info/gems/rspec-core/RSpec/Core/Configuration

RSpec.configure do |config|
# rspec-expectations config goes here. You can use an alternate
# assertion/expectation library such as wrong or the stdlib/minitest
Expand Down
Loading