Define Max Queue Length Using a Policy
To specify a maximum length using policy, add the key
max-length and / or max-length-bytes
to a policy definition. For example:
rabbitmqctl |
rabbitmqctl set_policy my-pol "^one-meg$" \ '{"max-length-bytes":1048576}' \ --apply-to queues |
---|---|
rabbitmqctl on Windows |
rabbitmqctl.bat set_policy my-pol "^one-meg$" ^ "{""max-length-bytes"":1048576}" ^ --apply-to queues |
The my-pol policy ensures that the one-meg
queue contains no more than 1MiB of message data. When the 1MiB limit
is reached, the oldest messages are discarded from the head of the
queue.
To define an overflow behaviour — whether to drop messages from head
or to reject new publishes, add the key overflow to a
policy definition. For example:
rabbitmqctl |
rabbitmqctl set_policy my-pol "^two-messages$" \ '{"max-length":2,"overflow":"reject-publish"}' \ --apply-to queues |
---|---|
rabbitmqctl on Windows |
rabbitmqctl.bat set_policy my-pol "^two-messages$" ^ "{""max-length"":2,""overflow"":""reject-publish""}" ^ --apply-to queues |
The my-pol policy ensures that the two-messages
queue contains no more than 2 messages and all additional publishes
are sent basic.nack responses as long as the queue
contains 2 messages and publisher confirms are enabled.
Policies can also be defined using the management plugin, see
the for more details.
«Node-local» and «Clusterwide» Commands
Client connections, channels and queues will be distributed across cluster nodes.
Operators need to be able to inspect and monitor such resources
across all cluster nodes.
CLI tools such as rabbitmqctl and
rabbitmq-diagnostics provide commands that inspect resources and
cluster-wide state. Some commands focus on the state of a single node
(e.g. rabbitmq-diagnostics environment and rabbitmq-diagnostics
status), others inspect cluster-wide state. Some examples of the
latter include rabbitmqctl list_connections, rabbitmqctl
list_mqtt_connections, rabbitmqctl list_stomp_connections,
rabbitmqctl list_users, rabbitmqctl list_vhosts and so on.
Such «cluster-wide» commands will often contact one node
first, discover cluster members and contact them all to
retrieve and combine their respective state. For example,
rabbitmqctl list_connections will contact all
nodes, retrieve their AMQP 0-9-1 and AMQP 1.0 connections,
and display them all to the user. The user doesn’t have
to manually contact all nodes.
Assuming a non-changing
state of the cluster (e.g. no connections are closed or
opened), two CLI commands executed against two different
nodes one after another will produce identical or
semantically identical results. «Node-local» commands, however, likely will not produce
identical results since two nodes rarely have entirely identical state.
Names
Queues have names so that applications can reference them.
Applications may pick queue names or ask the broker to
for them. Queue names may be up to 255 bytes of UTF-8 characters.
Queue names starting with «amq.» are reserved for internal
use by the broker. Attempts to declare a queue with a name that
violates this rule will result in a channel-level exception
with reply code 403 (ACCESS_REFUSED).
In AMQP 0-9-1, the broker can generate a unique queue name on behalf of
an app. To use this feature, pass an empty string as the queue name
argument: The same generated name may be obtained by subsequent
methods in the same channel by using the empty string where a queue
name is expected. This works because the channel remembers the last
server-generated queue name.
Server-named queues are meant to be used for state that is transient
in nature and specific to a particular consumer (application instance).
Applications can share such names in message metadata to let other applications respond
to them (as demonstrated in tutorial six).
Otherwise, the names of server-named queues should be known and used only by the
declaring application instance. The instance should also set up appropriate
bindings (routing) for the queue, so that publishers can use well-known
exchanges instead of the server-generated queue name directly.
Обменник
Default exchange
RabbitMQ всегда создает безымянный обменник типа direct.
На этот обменник по умолчанию завязаны все очереди, где routingKey равен имени очереди.
Отправка сообщения напрямую в очередь на самом деле это отправка сообщения безымянному обменнику с routingKey нужной очереди.
Как итог это выглядит и работает как отправка сообщения напрямую в очередь, но сообщения всегда отправляются через обменник.
Тип обменика (один из четырех вариантов)
- direct — прямая отправка в очередь, используется routingKey. Сообщение с каким-то routingKey будет отправленно обменником
в очередь связанную с этим обменником с тем же routingKey, иначе сброшено(удалено). - fanout — routingKey игнорируется, сообщения будут отправлены во все очереди связанные с обменником
- topic — похож на direct, сообщения отправляются с routingKey, но очереди связываются с обменником по шаблонной строке, сообщение попадет в те очереди с шаблонами которых совпадает routingKey
- headers — (todo)
Создание обменника
assertExchange(exchange_name, type, options)
- указать имя обменника
- тип обменника
- перечень аргументов
- durable (bool) — сохранять и восстанавливать после рестарта RabbitMQ
- autoDelete (bool) — удалится после того как все очереди отключатся
- (todo)
Связывание обменника
bindQueue(queue_name, exhange_name, pattern, args)
Необходимо указать:
- имя очереди
- имя обменника
- шаблон или routingKey (зависит от типа обменника)
Удаленное управление кластером
Вы можете ввести SSH в каждый блок и выполнить вышеупомянутые шаги для каждого блока вручную. Это работает, но очень быстро устаревает. Кроме того, это нецелесообразно, если вы хотите построить и разрушить кластер как часть автоматизированного теста.
Одним из решений является использование Fabric. Одна серьезная ошибка, с которой я столкнулся, заключается в том, что когда я выполнял алгоритм кластера сборки вручную, он работал отлично, но когда я использовал Fabric, он таинственным образом не работал. После некоторой отладки я заметил, что узлы запустились успешно, но к тому времени, когда я попытался stop_app, узлы были недоступны. Это оказалось ошибкой новичка Fabric с моей стороны. Когда вы запускаете удаленную команду с помощью Fabric, она запускает новую оболочку на удаленном компьютере. Когда команда завершена, оболочка закрывается, отправляя сигнал SIGHUP (сигнал зависания) всем его подпроцессам, включая узел Erlang. Использование nohup позаботится об этом. Другой более надежный вариант — запуск RabbitMQ в качестве службы (демона).
Upgrading From pre-3.7 Versions
RabbitMQ versions prior to 3.7.0 had a different logging subsystem.
Older installations use two log files:
<nodename>.log and <nodename>_sasl.log (<nodename> is rabbit@{hostname} by default).
Where <nodename>.log contains RabbitMQ logs, while <nodename>_sasl.log contains
runtime logs, mostly unhandled exceptions.
Starting with 3.7.0 these two files were merged and all errors now can be found in
the <nodename>.log file. So RABBITMQ_SASL_LOGS environment variable is not used
anymore.
Log levels in versions before 3.7.0 were configured using the log_levels configuration key.
Starting with 3.7.0 it’s been replaced with ,
which are more descriptive and powerful.
If the log_levels key is present in rabbitmq.config file, it should be updated to
.
rabbit.log_levels will work in 3.7.0 only if no categories are defined.
rabbitmq_management
rabbitmq_management — это веб-панель управления очередями RabbitMQ.
Перед активацией веб-панели rabbitmq должен быть выключенным.
По умолчанию логин и пароль guest. Нет необходимости менять этого пользователя или пароля для него. Он доступен только в localhost.
Активация панели:
usrlocalsbinrabbitmq-plugins enable amqp_client
Адрес на локалке:
http://127.0.0.1:15672/
Linux
Активация:
sudo rabbitmq-plugins enable rabbitmq_management
Адрес на сервере:
http://{сайт или IP}:15672/
Если настроен фаервол ufw, то откроем для него порты:
sudo ufw allow proto tcp from any to any port 5672,15672
Добавим нового пользователя:
sudo rabbitmqctl add_user user_ploshadka.net пароль
Добавим его в группу администраторов:
sudo rabbitmqctl set_user_tags user_ploshadka.net administrator
Log Message Categories
RabbitMQ has several categories of messages, which can be logged with different
levels or to different files.
The categories replace the rabbit.log_levels configuration setting in versions
earlier than 3.7.0.
The categories are:
- connection: for AMQP 0-9-1, AMQP 1.0, MQTT and STOMP.
- channel: channel logs. Mostly errors and warnings on AMQP 0-9-1 channels.
- queue: queue logs. Mostly debug messages.
- mirroring: queue mirroring logs. Queue mirrors status changes: starting/stopping/synchronizing.
- federation: federation plugin logs.
- upgrade: verbose upgrade logs. These can be excessive.
- default: all other log entries. You cannot override file location for this category.
It is possible to configure a different log level or file location for each message category
using log.<category>.level and log.<category>.file configuration variables.
By default each category will not filter by level. If an is output configured to log debug
messages, the debug messages will be printed for all categories. Configure a log level for a
category to override.
For example, given debug level in the file output,
the following will disable debug logging for connection events:
log.file.level = debug log.connection.level = info
To redirect all federation logs to the rabbit_federation.log file, use:
log.federation.file = rabbit_federation.log
To disable a log type, you can use the none log level. For example, to disable
upgrade logs:
log.upgrade.level = none
Log levels is another way to filter and tune logging. Each log level has a severity associated with it.
More critical messages have lower severity number, while debug has the highest number.
The following log levels are used by RabbitMQ:
Log level | Severity |
---|---|
debug | 128 |
info | 64 |
warning | 16 |
error | 8 |
critical | 4 |
none |
Default log level is info.
If the level of a log message is higher than the category level,
the message will be dropped and not sent to any output.
If a category level is not configured, its messages will always be sent
to all outputs.
To make the default category log only errors or higher severity messages, use
log.default.level = error
The none level means no logging.
Each output can use its own log level. If a message
level number is higher than the output level, the message will not be logged.
For example, if no outputs are configured to log
debug messages, even if the category level is set to debug, the
debug messages will not be logged.
Although, if an output is configured to log debug messages,
it will get them from all categories, unless a category level is configured.
There are two ways of changing effective log levels:
- Via configuration file(s): this is more flexible but requires
a node restart between changes - Using CLI tools, rabbitmqctl set_log_level <level>: the changes are transient (will not survive node restart) but can be used to
enable and disable e.g. at runtime for a period of time.
To set log level to debug on a running node:
rabbitmqctl -n rabbit@target-host set_log_level debug
To set the level to info:
rabbitmqctl -n rabbit@target-host set_log_level info
Introduction
This guide provides an overview of queues in RabbitMQ. Since
many features in a messaging system are related to queues, it
is not meant to be an exhaustive guide but rather an overview
that provides links to other guides.
This guide covers queues primarily in the context of AMQP 0-9-1,
however, much of the content is applicable to other supported protocols.
Some protocols (e.g. STOMP and MQTT) are based around the idea of topics.
For them, queues act as data accumulation buffer for consumers.
However, it is still important to understand the role queues play
because many features still operate at the queue level, even for those protocols.
Streams is an alternative messaging data structure available in RabbitMQ.
Streams provide different features from queues.
Consumers and Acknowledgements
Messages can be consumed by registering a consumer (subscription),
which means RabbitMQ will push messages to the client, or fetched
individually for protocols that support this (e.g. the basic.get AMQP 0-9-1 method),
similarly to HTTP GET.
Delivered messages can be acknowledged by consumer explicitly
or automatically as soon as a delivery is written to connection socket.
Automatic acknowledgement mode generally will provide higher throughput
rate and uses less network bandwidth. However, it offers the least number
of guarantees when it comes to failures. As a rule of
thumb, consider using manual acknowledgement mode first.
Automatic acknowledgement mode can also overwhelm
consumers which cannot process messages as quickly as they are delivered.
This can result in permanently growing memory usage and/or
OS swapping for the consumer process.
Manual acknowledgement mode provides a way to set a limit on the number
of outstanding (unconfirmed) deliveries: channel QoS (prefetch).
Consumers using higher (several thousands or more) prefetch levels can experience
the same overload problem as consumers using automatic acknowledgements.
High number of unacknowledged messages will lead to higher memory usage by
the broker.
Enqueued messages therefore can be in one of two states:
- Ready for delivery
- Delivered but not yet acknowledged by consumer
Message breakdown by state can be found in the management UI.
Подключить RabbitMQ в проект с помощью Composer
У меня уже были заметки про установку библиотек с помощью Composer`а, это очень удобно. Создаем в папке с проектом файл composer.json и пишем туда такое содержимое.
{ "require" : { "php-amqplib/php-amqplib" : ">=2.6.1" } }
Далее открываем папку с проектом и терминале и запускаем установку зависимостей
composer install
После этого в корне проекта появится папка vendor с установленной в ней библиотекой RabbitMQ.
Возможные проблемы! Иногда композер не может найти некоторые библиотеки в системе, например, у меня не нашлось двух библиотек, их нужно просто доустановить. В моем случае это были php-bcmath и php-mbstring, установить их легко.
apt-get install php-bcmath apt-get install php-mbstring
Следующим шагом будет подключить файл autoloader.php в файлы исходного кода в своем проекте с помощью require_once, например, если файл script.php лежит в корне проекта, то строчка будет выглядеть вот так
require_once __DIR__ . '/vendor/autoload.php';
С этого момента можно пользоваться библиотечными функциями.
Properties
Queues have properties that define how they behave. There is a set
of mandatory properties and a map of optional ones:
- Name
- Durable (the queue will survive a broker restart)
- Exclusive (used by only one connection and the queue will be deleted when that connection closes)
- Auto-delete (queue that has had at least one consumer is deleted when last consumer unsubscribes)
- Arguments (optional; used by plugins and broker-specific features such as message TTL, queue length limit, etc)
Note that not all property combination make sense in practice. For example, auto-delete
and exclusive queues should be . Such queues are supposed to
be used for client-specific or connection (session)-specific data.
When auto-delete or exclusive queues use well-known (static) names, in case of client disconnection
and immediate reconnection there will be a natural race condition between RabbitMQ nodes
that will delete such queues and recovering clients that will try to re-declare them.
This can result in client-side connection recovery failure or exceptions, and create unnecessary confusion
or affect application availability.
Before a queue can be used it has to be declared. Declaring
a queue will cause it to be created if it does not already
exist. The declaration will have no effect if the queue does
already exist and its attributes are the same as those in the
declaration. When the existing queue attributes are not the
same as those in the declaration a channel-level exception
with code 406 (PRECONDITION_FAILED) will be raised.
Optional queue arguments, also known as «x-arguments» because of their
field name in the AMQP 0-9-1 protocol, is a map (dictionary) of arbitrary key/value
pairs that can be provided by clients when a queue is declared.
The map is used by various features and plugins such as
- Queue type (e.g. quorum or classic)
- Message and queue TTL
- Queue length limit
- Legacy classic queue mirroring settings
- Max number of priorities
- Consumer priorities
and so on.
Most optional arguments can be dynamically changed after queue declaration but there are
exceptions. For example, queue type (x-queue-type) and max number
of queue priorities (x-max-priority) must be set at queue declaration time
and cannot be changed after that.
Optional queue arguments can be set in a couple of ways:
- To groups of queues using (recommended)
- On a per-queue basis when a queue is declared by a client
The former option is more flexible, non-intrusive, does not require application
modifications and redeployments. Therefore it is highly recommended for most users.
Note that some optional arguments such as queue type or max number of priorities can
only be provided by clients because they cannot be dynamically changed and must be known
at declaration time.
The way optional arguments are provided by clients varies from client library
to client library but is usually an argument next to the durable,
auto_delete and other arguments of the function (method) that
declares queues.
Tailing Logs Using CLI Tools
Modern releases support tailing logs of a node using CLI tools. This is convenient
when log file location is not known or is not easily accessible but CLI tool connectivity
is allowed.
To tail three hundred last lines on a node rabbitmq@target-host, use rabbitmq-diagnostics log_tail:
# This is semantically equivalent to using `tail -n 300 /path/to/[email protected]`. # Use -n to specify target node, -N is to specify the number of lines. rabbitmq-diagnostics -n rabbit@target-host log_tail -N 300
This will load and print last lines from the log file.
If only console logging is enabled, this command will fail with a «file not found» (enoent) error.
To continuously inspect as a stream of log messages as they are appended to a file,
similarly to tail -f or console logging, use rabbitmq-diagnostics log_tail_stream:
# This is semantically equivalent to using `tail -f /path/to/[email protected]`. # Use Control-C to stop the stream. rabbitmq-diagnostics -n rabbit@target-host log_tail_stream
This will continuously tail and stream lines added to the log file.
If only console logging is enabled, this command will fail with a «file not found» (enoent) error.
The rabbitmq-diagnostics log_tail_stream command can only be used against a running RabbitMQ node
and will fail if the node is not running or the RabbitMQ application on it
was stopped using rabbitmqctl stop_app.
Project Maturity
This plugin is considered to be experimental yet fairly stable and potential suitable for production use
as long as the user is aware of its limitations.
It had a few issues and one fundamental problem fixed in its ~ 18 months of
existence. It is known to work reasonably well for some users.
It also has known limitations (see a section below),
including those related to the replication of delayed and messages and the number of delayed messages.
This plugin is not commercially supported by Pivotal at the moment but
it doesn’t mean that it will be abandoned or team RabbitMQ is not interested
in improving it in the future. It is not, however, a high priority for our small team.
So, give it a try with your workload and decide for yourself.
Установка RabbitMQ на Mac OS
Прежде чем отправить работать нашего почтальона на боевом сервере надо где-то все это отладить. Т.к. я использую Mac OS, то буду описывать процесс установки на эту операционную систему.
Как и все сторонние пакеты в Mac OS RabbitMQ устанавливается через менеджер пакетов Homebrew.
Установка:
brew update brew install rabbitmq
Потом надо его запустить:
brew services start rabbitmq
Добавить переменную окружения перед вводом команд на Mac OS. В противном случае будут ошибки:
zsh: command not found: rabbitmqctl
Для исправление ввести:
export PATH=$PATH:usrlocaloptrabbitmqsbin
Если потребуется перезагрузкить кролика:
brew services restart rabbitmq
Оговорки
Такой код в неизменённом виде не должен использоваться на боевых проектах, не должен выполняться на боевых серверах. Здесь отсутствуют необходимые проверки безопасности и валидация. Этот код был написан только в учебных целях, а так же для того чтобы предоставить обзор имеющихся возможностей. При написании кода в приоритетах не стояли производительность, эффективность или повторное использование кода.
Обратите внимание, что даже если текущие примеры используют отправителя, брокера и получателя, которые располагаются на одном хосте для облегчения разработки и тестирования, в реальной жизни нет смысла держать “распределённую систему” на одной и той же машине. Полный исходный код находится в этом репозитории на GitHub, и содержит приложение, используемое в следующих примерах
Полный исходный код находится в этом репозитории на GitHub, и содержит приложение, используемое в следующих примерах.
Защита информации при обмене данными между информационными базами «Управление производственным предприятием»
Защита информации при обмене с распределенной базой данных.
В крупной фирме имеется одна центральная информационная база 1С:Предприятие 8.1 «Управление производственным предприятием» и несколько периферийных баз, обмен данными происходит через файлы XML. После выявления многочисленных случаев несанкционированного доступа к конфиденциальной информации, а именно кассовым и банковским документам при обмене данными, руководством было решено защитить данные при обмене информацией между центральной базой и периферийными базами. Перемещение данных между базами происходит через файлы XML, по нескольким каналам связи FTP, HTTP, POP, SMTP, а также на сменных носителях (типа USB Flash Drive). В связи с этим решено использовать внешнюю компоненту для шифрования файлов XML и небольшой доработки конфигурации баз данных.
Examples
Below is a policy where queues whose names begin with
«two.» are mirrored to any two nodes in the
cluster, with :
rabbitmqctl |
rabbitmqctl set_policy ha-two "^two\." \ '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' |
---|---|
rabbitmqctl (Windows) |
rabbitmqctl.bat set_policy ha-two "^two\." ^ "{""ha-mode"":""exactly"",""ha-params"":2,""ha-sync-mode"":""automatic""}" |
HTTP API |
PUT /api/policies/%2f/ha-two { "pattern":"^two\.", "definition": { "ha-mode":"exactly", "ha-params":2, "ha-sync-mode":"automatic" } } |
Web UI |
|
The following example declares a policy which matches
the queues whose names begin with «ha.» and configures
mirroring to all nodes in the cluster.
Note that mirroring to all nodes is rarely necessary and will result
in unnecessary resource waste.
See above:
rabbitmqctl |
# Note that mirroring to all nodes is rarely necessary. # Consider mirroring to the majority (N/2+1) nodes with "ha-mode":"exactly" instead. rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}' |
---|---|
rabbitmqctl (Windows) |
# Note that mirroring to all nodes is rarely necessary. # Consider mirroring to the majority (N/2+1) nodes with "ha-mode":"exactly" instead. rabbitmqctl.bat set_policy ha-all "^ha\." "{""ha-mode"":""all""}" |
HTTP API |
PUT /api/policies/%2f/ha-all {"pattern":"^ha\.", "definition":{"ha-mode":"all"}} |
Web UI |
|
A policy where queues whose names begin with
«nodes.» are mirrored to specific nodes in the
cluster:
rabbitmqctl |
rabbitmqctl set_policy ha-nodes "^nodes\." \ '{"ha-mode":"nodes","ha-params":}' |
---|---|
rabbitmqctl (Windows) |
rabbitmqctl set_policy ha-nodes "^nodes\." ^ "{""ha-mode"":""nodes"",""ha-params"":}" |
HTTP API |
PUT /api/policies/%2f/ha-nodes {"pattern":"^nodes\.", "definition":{"ha-mode":"nodes", "ha-params":} |
Web UI |
|
Getting Started
UNIX-like operating system users need to copy rabbitmqadmin to a directory in PATH, e.g. /usr/local/bin.
Windows users will need to ensure Python is on their PATH, and invoke
rabbitmqadmin as python.exe rabbitmqadmin.
Invoke rabbitmqadmin —help for usage instructions. You can:
- list exchanges, queues, bindings, vhosts, users, permissions, connections and channels
- show overview information
- declare and delete exchanges, queues, bindings, vhosts, users and permissions
- publish and get messages
- close connections and purge queues
- import and export configuration
For other tasks, see rabbitmqctl and
rabbitmq-plugins.
Non-mirrored Queue Behavior in a Cluster
This guide focuses on mirrored queues, however, it is important
to briefly explain how non-mirrored queues behave in a cluster in contrast
with mirrored ones.
If leader node of a queue (the node running queue leader) is available,
all queue operations (e.g. declaration, binding and consumer management, message routing
to the queue) can be performed on any node. Cluster nodes will route
operations to the leader node transparently to the clients.
If leader node of a queue
becomes unavailable, the behaviour of a non-mirrored queue
depends on its durability. A durable queue will become
unavailable until the node comes back.
All operations on a durable queue with unavailable leader node
will fail with a message in server logs that looks like this:
operation queue.declare caused a channel exception not_found: home node 'rabbit@hostname' of durable queue 'queue-name' in vhost '/' is down or inaccessible
A non-durable one will be deleted.
In case it is desired that the queue remains available at all times,
mirrors can be configured to be .