W3cubDocs

/Phoenix

Phoenix.PubSub

Front-end to Phoenix pubsub layer.

Used internally by Channels for pubsub broadcast but also provides an API for direct usage.

Adapters

Phoenix pubsub was designed to be flexible and support multiple backends. We currently ship with two backends:

  • Phoenix.PubSub.PG2 - uses Distributed Elixir, directly exchanging notifications between servers

  • Phoenix.PubSub.Redis - uses Redis to exchange data between servers

Pubsub adapters are often configured in your endpoint:

config :my_app, MyApp.Endpoint,
  pubsub: [adapter: Phoenix.PubSub.PG2,
           pool_size: 1,
           name: MyApp.PubSub]

The configuration above takes care of starting the pubsub backend and exposing its functions via the endpoint module. If no adapter but a name is given, nothing will be started, but the pubsub system will work by sending events and subscribing to the given name.

Direct usage

It is also possible to use Phoenix.PubSub directly or even run your own pubsub backends outside of an Endpoint.

The first step is to start the adapter of choice in your supervision tree:

supervisor(Phoenix.PubSub.Redis, [:my_pubsub, host: "192.168.100.1"])

The configuration above will start a Redis pubsub and register it with name :my_pubsub.

You can now use the functions in this module to subscribe and broadcast messages:

iex> PubSub.subscribe :my_pubsub, self, "user:123"
:ok
iex> Process.info(self)[:messages]
[]
iex> PubSub.broadcast :my_pubsub, "user:123", {:user_update, %{id: 123, name: "Shane"}}
:ok
iex> Process.info(self)[:messages]
{:user_update, %{id: 123, name: "Shane"}}

Implementing your own adapter

PubSub adapters run inside their own supervision tree. If you are interested in providing your own adapter, let's call it Phoenix.PubSub.MyQueue, the first step is to provide a supervisor module that receives the server name and a bunch of options on start_link/2:

defmodule Phoenix.PubSub.MyQueue do
  def start_link(name, options) do
    Supervisor.start_link(__MODULE__, {name, options},
                          name: Module.concat(name, Supervisor))
  end

  def init({name, options}) do
    ...
  end
end

On init/1, you will define the supervision tree and use the given name to register the main pubsub process locally. This process must be able to handle the following GenServer calls:

  • subscribe - subscribes the given pid to the given topic sends: {:subscribe, pid, topic, opts} respond with: :ok | {:error, reason} | {:perform, {m, f, a}}

  • unsubscribe - unsubscribes the given pid from the given topic sends: {:unsubscribe, pid, topic} respond with: :ok | {:error, reason} | {:perform, {m, f, a}}

  • broadcast - broadcasts a message on the given topic sends: {:broadcast, :none | pid, topic, message} respond with: :ok | {:error, reason} | {:perform, {m, f, a}}

Offloading work to clients via MFA response

The Phoenix.PubSub API allows any of its functions to handle a response from the adapter matching {:perform, {m, f, a}}. The PubSub client will recursively invoke all MFA responses until a result is returned. This is useful for offloading work to clients without blocking your PubSub adapter. See Phoenix.PubSub.PG2 implementation for examples.

Summary

Types

node_name()

Functions

broadcast(server, topic, message)

Broadcasts message on given topic

broadcast!(server, topic, message)

Broadcasts message on given topic

broadcast_from(server, from_pid, topic, message)

Broadcasts message to all but from_pid on given topic. See Phoenix.PubSub.broadcast/3 for usage details

broadcast_from!(server, from_pid, topic, message)

Broadcasts message to all but from_pid on given topic

direct_broadcast(node_name, server, topic, message)

Broadcasts message on given topic, to a single node

direct_broadcast!(node_name, server, topic, message)

Broadcasts message on given topic, to a single node

direct_broadcast_from(node_name, server, from_pid, topic, message)

Broadcasts message to all but from_pid on given topic, to a single node. See Phoenix.PubSub.broadcast/3 for usage details

direct_broadcast_from!(node_name, server, from_pid, topic, message)

Broadcasts message to all but from_pid on given topic, to a single node

node_name(server)

Returns the node name of the PubSub server

subscribe(server, topic)
subscribe(server, pid, topic)

Subscribes the caller to the PubSub adapter's topic

subscribe(server, pid, topic, opts)
unsubscribe(server, topic)
unsubscribe(server, pid, topic)

Unsubscribes the caller from the PubSub adapter's topic

Types

node_name :: atom :: binary

Functions

broadcast(server, topic, message)

Specs

broadcast(atom, binary, term) ::
  :ok |
  {:error, term}

Broadcasts message on given topic.

  • server - The Pid or registered server name and optional node to scope the broadcast, for example: MyApp.PubSub, {MyApp.PubSub, :a@node}
  • topic - The topic to broadcast to, ie: "users:123"
  • message - The payload of the broadcast

broadcast!(server, topic, message)

Specs

broadcast!(atom, binary, term) :: :ok | no_return

Broadcasts message on given topic.

Raises Phoenix.PubSub.BroadcastError if broadcast fails. See Phoenix.PubSub.broadcast/3 for usage details.

broadcast_from(server, from_pid, topic, message)

Specs

broadcast_from(atom, pid, binary, term) ::
  :ok |
  {:error, term}

Broadcasts message to all but from_pid on given topic. See Phoenix.PubSub.broadcast/3 for usage details.

broadcast_from!(server, from_pid, topic, message)

Specs

broadcast_from!(atom | {atom, atom}, pid, binary, term) ::
  :ok |
  no_return

Broadcasts message to all but from_pid on given topic.

Raises Phoenix.PubSub.BroadcastError if broadcast fails. See Phoenix.PubSub.broadcast/3 for usage details.

direct_broadcast(node_name, server, topic, message)

Specs

direct_broadcast(node_name, atom, binary, term) ::
  :ok |
  {:error, term}

Broadcasts message on given topic, to a single node.

  • node - The name of the node to broadcast the message on
  • server - The Pid or registered server name and optional node to scope the broadcast, for example: MyApp.PubSub, {MyApp.PubSub, :a@node}
  • topic - The topic to broadcast to, ie: "users:123"
  • message - The payload of the broadcast

direct_broadcast!(node_name, server, topic, message)

Specs

direct_broadcast!(node_name, atom, binary, term) ::
  :ok |
  no_return

Broadcasts message on given topic, to a single node.

Raises Phoenix.PubSub.BroadcastError if broadcast fails. See Phoenix.PubSub.broadcast/3 for usage details.

direct_broadcast_from(node_name, server, from_pid, topic, message)

Specs

direct_broadcast_from(node_name, atom, pid, binary, term) ::
  :ok |
  {:error, term}

Broadcasts message to all but from_pid on given topic, to a single node. See Phoenix.PubSub.broadcast/3 for usage details.

direct_broadcast_from!(node_name, server, from_pid, topic, message)

Specs

direct_broadcast_from!(node_name, atom, pid, binary, term) ::
  :ok |
  no_return

Broadcasts message to all but from_pid on given topic, to a single node.

Raises Phoenix.PubSub.BroadcastError if broadcast fails. See Phoenix.PubSub.broadcast/3 for usage details.

node_name(server)

Specs

node_name(atom) :: atom :: binary

Returns the node name of the PubSub server.

subscribe(server, topic)

Specs

subscribe(atom, binary) :: :ok | {:error, term}

subscribe(server, pid, topic)

Specs

subscribe(atom, binary, Keyword.t) ::
  :ok |
  {:error, term}

Subscribes the caller to the PubSub adapter's topic.

  • server - The Pid registered name of the server
  • topic - The topic to subscribe to, ie: "users:123"
  • opts - The optional list of options. See below.

Duplicate Subscriptions

Callers should only subscribe to a given topic a single time. Duplicate subscriptions for a Pid/topic pair are allowed and will cause duplicate events to be sent; however, when using Phoenix.PubSub.unsubscribe/3, all duplicate subscriptions will be dropped.

Options

  • :link - links the subscriber to the pubsub adapter
  • :fastlane - Provides a fastlane path for the broadcasts for %Phoenix.Socket.Broadcast{} events. The fastlane process is notified of a cached message instead of the normal subscriber. Fastlane handlers must implement fastlane/1 callbacks which accepts a Phoenix.Socket.Broadcast structs and returns a fastlaned format for the handler. For example:

    PubSub.subscribe(MyApp.PubSub, "topic1",
      fastlane: {fast_pid, Phoenix.Transports.WebSocketSerializer, ["event1"]})

subscribe(server, pid, topic, opts)

Specs

subscribe(atom, pid, binary, Keyword.t) ::
  :ok |
  {:error, term}
subscribe(atom, pid, binary, Keyword.t) ::
  :ok |
  {:error, term}

unsubscribe(server, topic)

Specs

unsubscribe(atom, binary) :: :ok | {:error, term}

unsubscribe(server, pid, topic)

Specs

unsubscribe(atom, pid, binary) ::
  :ok |
  {:error, term}

Unsubscribes the caller from the PubSub adapter's topic.

© 2014 Chris McCord
Licensed under the MIT License.
https://hexdocs.pm/phoenix_pubsub/Phoenix.PubSub.html