class MQTT::Client
Constants
- ATTR_DEFAULTS
Default attribute values
- SELECT_TIMEOUT
Timeout between select polls (in seconds)
Attributes
Number of seconds to wait for acknowledgement packets (default is 5 seconds)
Set the ‘Clean Session’ flag when connecting? (default is true)
Client Identifier
Number of seconds to connect to the server (default is 90 seconds)
Hostname of the remote server
Time (in seconds) between pings to remote server (default is 15 seconds)
Last ping response time
Password to authenticate to the server with
Port number of the remote server
Set to true to enable SSL/TLS encrypted communication
Set to a symbol to use a specific variant of SSL/TLS. Allowed values include:
@example Using TLS 1.0
client = Client.new('mqtt.example.com', :ssl => :TLSv1)
@see OpenSSL::SSL::SSLContext::METHODS
Username to authenticate to the server with
Set to false to skip tls hostname verification
The version number of the MQTT protocol to use (default 3.1.1)
Contents of message that is sent by server when client disconnect
The QoS level of the will message sent by the server
If the Will message should be retain by the server after it is sent
The topic that the Will message is published to
Public Class Methods
Create and connect a new MQTT Client
Accepts the same arguments as creating a new client. If a block is given, then it will be executed before disconnecting again.
Example:
MQTT::Client.connect('myserver.example.com') do |client| # do stuff here end
# File lib/mqtt/client.rb, line 98 def self.connect(*args, &block) client = MQTT::Client.new(*args) client.connect(&block) client end
Generate a random client identifier (using the characters 0-9 and a-z)
# File lib/mqtt/client.rb, line 106 def self.generate_client_id(prefix = 'ruby', length = 16) str = prefix.dup length.times do num = rand(36) # Adjust based on number or letter. num += num < 10 ? 48 : 87 str += num.chr end str end
Create a new MQTT Client instance
Accepts one of the following:
-
a URI that uses the
MQTTscheme -
a hostname and port
-
a Hash containing attributes to be set on the new instance
If no arguments are given then the method will look for a URI in the MQTT_SERVER environment variable.
Examples:
client = MQTT::Client.new client = MQTT::Client.new('mqtt://myserver.example.com') client = MQTT::Client.new('mqtt://user:pass@myserver.example.com') client = MQTT::Client.new('myserver.example.com') client = MQTT::Client.new('myserver.example.com', 18830) client = MQTT::Client.new(:host => 'myserver.example.com') client = MQTT::Client.new(:host => 'myserver.example.com', :keep_alive => 30)
# File lib/mqtt/client.rb, line 136 def initialize(*args) attributes = args.last.is_a?(Hash) ? args.pop : {} # Set server URI from environment if present attributes.merge!(parse_uri(ENV['MQTT_SERVER'])) if args.length.zero? && ENV['MQTT_SERVER'] if args.length >= 1 case args[0] when URI attributes.merge!(parse_uri(args[0])) when %r{^mqtts?://} attributes.merge!(parse_uri(args[0])) else attributes[:host] = args[0] end end if args.length >= 2 attributes[:port] = args[1] unless args[1].nil? end raise ArgumentError, 'Unsupported number of arguments' if args.length >= 3 # Merge arguments with default values for attributes ATTR_DEFAULTS.merge(attributes).each_pair do |k, v| send("#{k}=", v) end # Set a default port number if @port.nil? @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT end if @ssl require 'openssl' require 'mqtt/openssl_fix' end # Initialise private instance variables @last_ping_request = current_time @last_ping_response = current_time @socket = nil @read_queue = Queue.new @pubacks = {} @read_thread = nil @write_semaphore = Mutex.new @pubacks_semaphore = Mutex.new end
Public Instance Methods
Set a path to a file containing a PEM-format CA certificate and enable peer verification
# File lib/mqtt/client.rb, line 213 def ca_file=(path) ssl_context.ca_file = path ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless path.nil? end
PEM-format client certificate
# File lib/mqtt/client.rb, line 196 def cert=(cert) ssl_context.cert = OpenSSL::X509::Certificate.new(cert) end
Set a path to a file containing a PEM-format client certificate
# File lib/mqtt/client.rb, line 191 def cert_file=(path) self.cert = File.read(path) end
Clear the incoming message queue.
# File lib/mqtt/client.rb, line 452 def clear_queue @read_queue.clear end
Connect to the MQTT server If a block is given, then yield to that block and then disconnect again.
# File lib/mqtt/client.rb, line 231 def connect(clientid = nil) @client_id = clientid unless clientid.nil? if @client_id.nil? || @client_id.empty? raise 'Must provide a client_id if clean_session is set to false' unless @clean_session # Empty client id is not allowed for version 3.1.0 @client_id = MQTT::Client.generate_client_id if @version == '3.1.0' end raise 'No MQTT server host set when attempting to connect' if @host.nil? unless connected? # Create network socket tcp_socket = open_tcp_socket if @ssl # Set the protocol version ssl_context.ssl_version = @ssl if @ssl.is_a?(Symbol) @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) @socket.sync_close = true # Set hostname on secure socket for Server Name Indication (SNI) @socket.hostname = @host if @socket.respond_to?(:hostname=) @socket.connect @socket.post_connection_check(@host) if @verify_host else @socket = tcp_socket end # Construct a connect packet packet = MQTT::Packet::Connect.new( :version => @version, :clean_session => @clean_session, :keep_alive => @keep_alive, :client_id => @client_id, :username => @username, :password => @password, :will_topic => @will_topic, :will_payload => @will_payload, :will_qos => @will_qos, :will_retain => @will_retain ) # Send packet send_packet(packet) # Receive response receive_connack # Start packet reading thread @read_thread = Thread.new(Thread.current) do |parent| Thread.current[:parent] = parent receive_packet while connected? end end return unless block_given? # If a block is given, then yield and disconnect begin yield(self) ensure disconnect end end
Checks whether the client is connected to the server.
# File lib/mqtt/client.rb, line 321 def connected? !@socket.nil? && !@socket.closed? end
Disconnect from the MQTT server. If you don’t want to say goodbye to the server, set send_msg to false.
# File lib/mqtt/client.rb, line 303 def disconnect(send_msg = true) # Stop reading packets from the socket first @read_thread.kill if @read_thread && @read_thread.alive? @read_thread = nil return unless connected? # Close the socket if it is open if send_msg packet = MQTT::Packet::Disconnect.new send_packet(packet) end @socket.close unless @socket.nil? handle_close @socket = nil end
Return the next message received from the MQTT server. An optional topic can be given to subscribe to.
The method either returns the topic and message as an array:
topic,message = client.get
Or can be used with a block to keep processing messages:
client.get('test') do |topic,payload| # Do stuff here end
# File lib/mqtt/client.rb, line 395 def get(topic = nil, options = {}) if block_given? get_packet(topic) do |packet| yield(packet.topic, packet.payload) unless packet.retain && options[:omit_retained] end else loop do # Wait for one packet to be available packet = get_packet(topic) return packet.topic, packet.payload unless packet.retain && options[:omit_retained] end end end
Return the next packet object received from the MQTT server. An optional topic can be given to subscribe to.
The method either returns a single packet:
packet = client.get_packet puts packet.topic
Or can be used with a block to keep processing messages:
client.get_packet('test') do |packet| # Do stuff here puts packet.topic end
# File lib/mqtt/client.rb, line 422 def get_packet(topic = nil) # Subscribe to a topic, if an argument is given subscribe(topic) unless topic.nil? if block_given? # Loop forever! loop do packet = @read_queue.pop yield(packet) puback_packet(packet) if packet.qos > 0 end else # Wait for one packet to be available packet = @read_queue.pop puback_packet(packet) if packet.qos > 0 return packet end end
Set to a PEM-format client private key
# File lib/mqtt/client.rb, line 207 def key=(*args) cert, passphrase = args.flatten ssl_context.key = OpenSSL::PKey.read(cert, passphrase) end
Set a path to a file containing a PEM-format client private key
# File lib/mqtt/client.rb, line 201 def key_file=(*args) path, passphrase = args.flatten ssl_context.key = OpenSSL::PKey.read(File.binread(path), passphrase) end
Publish a message on a particular topic to the MQTT server.
# File lib/mqtt/client.rb, line 326 def publish(topic, payload = '', retain = false, qos = 0) raise ArgumentError, 'Topic name cannot be nil' if topic.nil? raise ArgumentError, 'Topic name cannot be empty' if topic.empty? packet = MQTT::Packet::Publish.new( :id => next_packet_id, :qos => qos, :retain => retain, :topic => topic, :payload => payload ) queue = qos.zero? ? nil : wait_for_puback(packet.id) res = send_packet(packet) return unless queue deadline = current_time + @ack_timeout loop do response = queue.pop case response when :read_timeout return -1 if current_time > deadline when :close return -1 else @pubacks_semaphore.synchronize do @pubacks.delete packet.id end break end end res end
Returns true if the incoming message queue is empty.
# File lib/mqtt/client.rb, line 442 def queue_empty? @read_queue.empty? end
Returns the length of the incoming message queue.
# File lib/mqtt/client.rb, line 447 def queue_length @read_queue.length end
@deprecated Please use {#host} instead
# File lib/mqtt/client.rb, line 623 def remote_host host end
@deprecated Please use {#host=} instead
# File lib/mqtt/client.rb, line 628 def remote_host=(args) self.host = args end
@deprecated Please use {#port} instead
# File lib/mqtt/client.rb, line 633 def remote_port port end
@deprecated Please use {#port=} instead
# File lib/mqtt/client.rb, line 638 def remote_port=(args) self.port = args end
Set the Will for the client
The will is a message that will be delivered by the server when the client dies. The Will must be set before establishing a connection to the server
# File lib/mqtt/client.rb, line 222 def set_will(topic, payload, retain = false, qos = 0) self.will_topic = topic self.will_payload = payload self.will_retain = retain self.will_qos = qos end
Get the OpenSSL context, that is used if SSL/TLS is enabled
# File lib/mqtt/client.rb, line 186 def ssl_context @ssl_context ||= OpenSSL::SSL::SSLContext.new end
Send a subscribe message for one or more topics on the MQTT server. The topics parameter should be one of the following:
-
String: subscribe to one topic with QoS 0
-
Array: subscribe to multiple topics with QoS 0
-
Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level
For example:
client.subscribe( 'a/b' ) client.subscribe( 'a/b', 'c/d' ) client.subscribe( ['a/b',0], ['c/d',1] ) client.subscribe( 'a/b' => 0, 'c/d' => 1 )
# File lib/mqtt/client.rb, line 376 def subscribe(*topics) packet = MQTT::Packet::Subscribe.new( :id => next_packet_id, :topics => topics ) send_packet(packet) end
Send a unsubscribe message for one or more topics on the MQTT server
# File lib/mqtt/client.rb, line 457 def unsubscribe(*topics) topics = topics.first if topics.is_a?(Enumerable) && topics.count == 1 packet = MQTT::Packet::Unsubscribe.new( :topics => topics, :id => next_packet_id ) send_packet(packet) end
Private Instance Methods
# File lib/mqtt/client.rb, line 525 def current_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end
# File lib/mqtt/client.rb, line 518 def handle_close @pubacks_semaphore.synchronize do @pubacks.each_value { |q| q << :close } end end
# File lib/mqtt/client.rb, line 497 def handle_packet(packet) if packet.class == MQTT::Packet::Publish # Add to queue @read_queue.push(packet) elsif packet.class == MQTT::Packet::Pingresp @last_ping_response = current_time elsif packet.class == MQTT::Packet::Puback @pubacks_semaphore.synchronize do @pubacks[packet.id] << packet if @pubacks[packet.id] end end # Ignore all other packets # FIXME: implement responses for QoS 2 end
# File lib/mqtt/client.rb, line 512 def handle_timeouts @pubacks_semaphore.synchronize do @pubacks.each_value { |q| q << :read_timeout } end end
# File lib/mqtt/client.rb, line 535 def keep_alive! return unless @keep_alive > 0 && connected? response_timeout = (@keep_alive * 1.5).ceil if current_time >= @last_ping_request + @keep_alive packet = MQTT::Packet::Pingreq.new send_packet(packet) @last_ping_request = current_time elsif current_time > @last_ping_response + response_timeout raise MQTT::ProtocolException, "No Ping Response received for #{response_timeout} seconds" end end
# File lib/mqtt/client.rb, line 600 def next_packet_id @last_packet_id = (@last_packet_id || 0).next @last_packet_id = 1 if @last_packet_id > 0xffff @last_packet_id end
# File lib/mqtt/client.rb, line 606 def open_tcp_socket return TCPSocket.new @host, @port, connect_timeout: @connect_timeout if RUBY_VERSION.to_f >= 3.0 begin Timeout.timeout(@connect_timeout) do return TCPSocket.new(@host, @port) end rescue Timeout::Error raise IO::TimeoutError, "Connection timed out for \"#{@host}\" port #{@port}" if defined? IO::TimeoutError raise Errno::ETIMEDOUT, "Connection timed out for \"#{@host}\" port #{@port}" end end
# File lib/mqtt/client.rb, line 581 def parse_uri(uri) uri = URI.parse(uri) unless uri.is_a?(URI) if uri.scheme == 'mqtt' ssl = false elsif uri.scheme == 'mqtts' ssl = true else raise 'Only the mqtt:// and mqtts:// schemes are supported' end { :host => uri.host, :port => uri.port || nil, :username => uri.user ? URI::DEFAULT_PARSER.unescape(uri.user) : nil, :password => uri.password ? URI::DEFAULT_PARSER.unescape(uri.password) : nil, :ssl => ssl } end
# File lib/mqtt/client.rb, line 548 def puback_packet(packet) send_packet(MQTT::Packet::Puback.new(:id => packet.id)) end
Read and check a connection acknowledgement packet
# File lib/mqtt/client.rb, line 553 def receive_connack Timeout.timeout(@ack_timeout) do packet = MQTT::Packet.read(@socket) if packet.class != MQTT::Packet::Connack raise MQTT::ProtocolException, "Response wasn't a connection acknowledgement: #{packet.class}" end # Check the return code if packet.return_code != 0x00 # 3.2.2.3 If a server sends a CONNACK packet containing a non-zero # return code it MUST then close the Network Connection @socket.close raise MQTT::ProtocolException, packet.return_msg end end end
Try to read a packet from the server Also sends keep-alive ping packets.
# File lib/mqtt/client.rb, line 471 def receive_packet # Poll socket - is there data waiting? result = IO.select([@socket], [], [], SELECT_TIMEOUT) handle_timeouts unless result.nil? # Yes - read in the packet packet = MQTT::Packet.read(@socket) handle_packet packet end keep_alive! # Pass exceptions up to parent thread rescue ::Exception => exp unless @socket.nil? @socket.close @socket = nil handle_close end Thread.current[:parent].raise(exp) end
Send a packet to server
# File lib/mqtt/client.rb, line 571 def send_packet(data) # Raise exception if we aren't connected raise MQTT::NotConnectedException unless connected? # Only allow one thread to write to socket at a time @write_semaphore.synchronize do @socket.write(data.to_s) end end
# File lib/mqtt/client.rb, line 491 def wait_for_puback(id) @pubacks_semaphore.synchronize do @pubacks[id] = Queue.new end end