Server Specific Parameters
Repid is designed to be highly unopinionated and framework-agnostic. However, different message brokers (like RabbitMQ, Redis, or Google Cloud Pub/Sub) often have unique features that don't map cleanly to a universal abstraction.
For these situations, Repid provides an "escape hatch" known as server_specific_parameters.
Publishing with Parameters
When you use send_message, send_message_json, or the eagerly-returned message.reply() and
message.send_message(), you can pass a server_specific_parameters dictionary.
await app.send_message_json(
channel="tasks",
payload={"task": "process_video"},
headers={"topic": "video_worker"},
server_specific_parameters={
# Your broker-specific flags go here
}
)
The keys and values inside this dictionary are passed directly to the underlying connection
implementation (ServerT.publish). If a parameter is not recognized by the active server, it is
usually safely ignored, though behavior depends on the specific connection library.
Supported Parameters by Broker
Here is a list of parameters officially supported by Repid's built-in servers:
RabbitMQ (AMQP)
When using the AmqpServer, you can customize how the internal AMQP 1.0 protocol publishes the
message by providing instances of AMQP AmqpMessageHeader or AmqpMessageProperties objects.
from repid import AmqpServer
from repid.connections.amqp import AmqpMessageHeader, AmqpMessageProperties
await app.send_message_json(
channel="tasks",
payload={"msg": "hello"},
server_specific_parameters={
"header": AmqpMessageHeader(
durable=True,
priority=10,
ttl=5000, # Time to live in ms
delivery_count=0
),
"properties": AmqpMessageProperties(
subject="task_subject",
reply_to="reply_queue",
group_id="group_1",
absolute_expiry_time=1700000000
),
"routing_key": "my.custom.routing.key"
}
)
The supported keys include:
header(AmqpHeader): Override the AMQP header. Includes properties likedurable,priority,ttl(Time to live in ms),first_acquirer, anddelivery_count.properties(AmqpProperties): Override standard AMQP properties. Includes properties likemessage_id,user_id,to,subject,reply_to,correlation_id,content_type,content_encoding,absolute_expiry_time,creation_time,group_id,group_sequence, andreply_to_group_id.delivery_annotations(dict[str, Any]): Add AMQP delivery annotations.message_annotations(dict[str, Any]): Add AMQP message annotations.footers(dict[str, Any]): Add AMQP footers.to(str): Explicitly override the AMQPtoaddress, bypassing Repid's default naming strategy.routing_key(str): Specify a custom routing key to use in combination with Repid's exchange naming strategy.
Redis Streams
When using the RedisServer, the parameters directly translate to XADD command options.
maxlen(int): Maximum stream length. This forces the stream to drop older entries if it exceeds this length.approximate(bool): Used in conjunction withmaxlen. IfTrue(default), uses~forMAXLENwhich is much more performant.nomkstream(bool): IfTrue, the message will not be added and an error will be thrown if the stream doesn't already exist.stream_id(str): Custom stream entry ID. Defaults to"*"(auto-generate).
Google Cloud Pub/Sub
When using the PubsubServer, you can customize the gRPC PublishRequest.
topic(str): Override the target topic name (bypasses default prefixing).project(str): Override the Google Cloud project ID for this specific publish.timeout(int | float | timedelta): Override the timeout for the gRPC publish call.ordering_key(str): A string identifying related messages for which publish order should be respected.attributes(dict[str, str]): Extra metadata attributes to attach directly to the Pub/Sub message envelope.
In-Memory
When using the InMemoryServer for testing or local development:
message_id(str): Explicitly set the unique ID of the dummy message in the queue. If omitted, a random UUID is generated.