The distributed RPC framework provides mechanisms for multi-machine model training through a set of primitives to allow for remote communication, and a higher-level API to automatically differentiate models split across several machines.
Warning
APIs in the RPC package are stable. There are multiple ongoing work items to improve performance and error handling, which will ship in future releases.
Note
Please refer to PyTorch Distributed Overview for a brief introduction to all features related to distributed training.
The distributed RPC framework makes it easy to run functions remotely, supports referencing remote objects without copying the real data around, and provides autograd and optimizer APIs to transparently run backward and update parameters across RPC boundaries. These features can be categorized into four sets of APIs.
rpc_sync()
(synchronous), rpc_async()
(asynchronous), and remote()
(asynchronous and returns a reference to the remote return value). Use the synchronous API if the user code cannot proceed without the return value. Otherwise, use the asynchronous API to get a future, and wait on the future when the return value is needed on the caller. The remote()
API is useful when the requirement is to create something remotely but never need to fetch it to the caller. Imagine the case that a driver process is setting up a parameter server and a trainer. The driver can create an embedding table on the parameter server and then share the reference to the embedding table with the trainer, but itself will never use the embedding table locally. In this case, rpc_sync()
and rpc_async()
are no longer appropriate, as they always imply that the return value will be returned to the caller immediately or in the future.remote()
) or the owner of the object. The distributed optimizer, as we will discuss below, is one example of such use cases.Optimizer()
(e.g., SGD()
, Adagrad()
, etc.) and a list of parameter RRefs, creates an Optimizer()
instance on each distinct RRef owner, and updates parameters accordingly when running step()
. When you have distributed forward and backward passes, parameters and gradients will be scattered across multiple workers, and hence it requires an optimizer on each of the involved workers. Distributed Optimizer wraps all those local optimizers into one, and provides a concise constructor and step()
API.Before using RPC and distributed autograd primitives, initialization must take place. To initialize the RPC framework we need to use init_rpc()
which would initialize the RPC framework, RRef framework and distributed autograd.
torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)
[source]
Initializes RPC primitives such as the local RPC agent and distributed autograd, which immediately makes the current process ready to send and receive RPCs.
BackendType.TENSORPIPE
(the default) and BackendType.PROCESS_GROUP
. See Backends for more information.Trainer3
, ParameterServer2
, Master
, Worker1
) Name can only contain number, alphabet, underscore, colon, and/or dash, and must be shorter than 128 characters.RpcBackendOptions
and contains agent-specific initialization configurations. By default, for all agents, it sets the default timeout to 60 seconds and performs the rendezvous with an underlying process group initialized using init_method = "env://"
, meaning that environment variables MASTER_ADDR
and MASTER_PORT
need to be set properly. See Backends for more information and find which options are available.The following APIs allow users to remotely execute functions as well as create references (RRefs) to remote data objects. In these APIs, when passing a Tensor
as an argument or a return value, the destination worker will try to create a Tensor
with the same meta (i.e., shape, stride, etc.). We intentionally disallow transmitting CUDA tensors because it might crash if the device lists on source and destination workers do not match. In such cases, applications can always explicitly move the input tensors to CPU on the caller and move it to the desired devices on the callee if necessary.
Warning
TorchScript support in RPC is a prototype feature and subject to change. Since v1.5.0, torch.distributed.rpc
supports calling TorchScript functions as RPC target functions, and this will help improve parallelism on the callee side as executing TorchScript functions does not require GIL.
torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)
[source]
Make a blocking RPC call to run function func
on worker to
. RPC messages are sent and received in parallel to execution of Python code. This method is thread-safe.
add()
) and annotated TorchScript functions.func
invocation.func
invocation._set_rpc_timeout
is used.Returns the result of running func
with args
and kwargs
.
Warning
Using GPU tensors as arguments or return values of func
is not supported since we don’t support sending GPU tensors over the wire. You need to explicitly copy GPU tensors to CPU before using them as arguments or return values of func
.
Make sure that MASTER_ADDR
and MASTER_PORT
are set properly on both workers. Refer to init_process_group()
API for more details. For example,
>>> export MASTER_ADDR=localhost >>> export MASTER_PORT=5678
Then run the following code in two different processes:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
Below is an example of running a TorchScript function using RPC.
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(t1, t2): >>> return torch.add(t1, t2)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)
[source]
Make a non-blocking RPC call to run function func
on worker to
. RPC messages are sent and received in parallel to execution of Python code. This method is thread-safe. This method will immediately return a Future
that can be awaited on.
add()
) and annotated TorchScript functions.func
invocation.func
invocation._set_rpc_timeout
is used.Returns a Future
object that can be waited on. When completed, the return value of func
on args
and kwargs
can be retrieved from the Future
object.
Warning
Using GPU tensors as arguments or return values of func
is not supported since we don’t support sending GPU tensors over the wire. You need to explicitly copy GPU tensors to CPU before using them as arguments or return values of func
.
Warning
The rpc_async
API does not copy storages of argument tensors until sending them over the wire, which could be done by a different thread depending on the RPC backend type. The caller should make sure that the contents of those tensors stay intact until the returned Future
completes.
Make sure that MASTER_ADDR
and MASTER_PORT
are set properly on both workers. Refer to init_process_group()
API for more details. For example,
>>> export MASTER_ADDR=localhost >>> export MASTER_PORT=5678
Then run the following code in two different processes:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3)) >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2)) >>> result = fut1.wait() + fut2.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
Below is an example of running a TorchScript function using RPC.
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(t1, t2): >>> return torch.add(t1, t2)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3)) >>> ret = fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)
[source]
Make a remote call to run func
on worker to
and return an RRef
to the result value immediately. Worker to
will be the owner of the returned RRef
, and the worker calling remote
is a user. The owner manages the global reference count of its RRef
, and the owner RRef
is only destructed when globally there are no living references to it.
add()
) and annotated TorchScript functions.func
invocation.func
invocation.RRef
on worker to
is not successfully processed on this worker within this timeout, then the next time there is an attempt to use the RRef (such as to_here()
), a timeout will be raised indicating this failure. A value of 0 indicates an infinite timeout, i.e. a timeout error will never be raised. If not provided, the default value set during initialization or with _set_rpc_timeout
is used.A user RRef
instance to the result value. Use the blocking API torch.distributed.rpc.RRef.to_here()
to retrieve the result value locally.
Warning
Using GPU tensors as arguments or return values of func
is not supported since we don’t support sending GPU tensors over the wire. You need to explicitly copy GPU tensors to CPU before using them as arguments or return values of func
.
Warning
The remote
API does not copy storages of argument tensors until sending them over the wire, which could be done by a different thread depending on the RPC backend type. The caller should make sure that the contents of those tensors stay intact until the returned RRef is confirmed by the owner, which can be checked using the torch.distributed.rpc.RRef.confirmed_by_owner()
API.
Warning
Errors such as timeouts for the remote
API are handled on a best-effort basis. This means that when remote calls initiated by remote
fail, such as with a timeout error, we take a best-effort approach to error handling. This means that errors are handled and set on the resulting RRef on an asynchronous basis. If the RRef has not been used by the application before this handling (such as to_here
or fork call), then future uses of the RRef
will appropriately raise errors. However, it is possible that the user application will use the RRef
before the errors are handled. In this case, errors may not be raised as they have not yet been handled.
Make sure that MASTER_ADDR
and MASTER_PORT
are set properly on both workers. Refer to init_process_group()
API for more details. For example,
>>> export MASTER_ADDR=localhost >>> export MASTER_PORT=5678
Then run the following code in two different processes:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> x = rref1.to_here() + rref2.to_here() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
Below is an example of running a TorchScript function using RPC.
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(t1, t2): >>> return torch.add(t1, t2)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rref.to_here() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
torch.distributed.rpc.get_worker_info(worker_name=None)
[source]
Get WorkerInfo
of a given worker name. Use this WorkerInfo
to avoid passing an expensive string on every invocation.
worker_name (str) – the string name of a worker. If None
, return the the id of the current worker. (default None
)
WorkerInfo
instance for the given worker_name
or WorkerInfo
of the current worker if worker_name
is None
.
torch.distributed.rpc.shutdown(graceful=True)
[source]
Perform a shutdown of the RPC agent, and then destroy the RPC agent. This stops the local agent from accepting outstanding requests, and shuts down the RPC framework by terminating all RPC threads. If graceful=True
, this will block until all local and remote RPC processes reach this method and wait for all outstanding work to complete. Otherwise, if graceful=False
, this is a local shutdown, and it does not wait for other RPC processes to reach this method.
Warning
For Future
objects returned by rpc_async()
, future.wait()
should not be called after shutdown()
.
graceful (bool) – Whether to do a graceful shutdown or not. If True, this will 1) wait until there is no pending system messages for UserRRefs
and delete them; 2) block until all local and remote RPC processes have reached this method and wait for all outstanding work to complete.
Make sure that MASTER_ADDR
and MASTER_PORT
are set properly on both workers. Refer to init_process_group()
API for more details. For example,
>>> export MASTER_ADDR=localhost >>> export MASTER_PORT=5678
Then run the following code in two different processes:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> # do some work >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1)) >>> # ready to shutdown >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> # wait for worker 0 to finish work, and then shutdown. >>> rpc.shutdown()
class torch.distributed.rpc.WorkerInfo
A structure that encapsulates information of a worker in the system. Contains the name and ID of the worker. This class is not meant to be constructed directly, rather, an instance can be retrieved through get_worker_info()
and the result can be passed in to functions such as rpc_sync()
, rpc_async()
, remote()
to avoid copying a string on every invocation.
property id
Globally unique id to identify the worker.
property name
The name of the worker.
The RPC package also provides decorators which allow applications to specify how a given function should be treated on the callee side.
torch.distributed.rpc.functions.async_execution(fn)
[source]
A decorator for a function indicating that the return value of the function is guaranteed to be a Future
object and this function can run asynchronously on the RPC callee. More specifically, the callee extracts the Future
returned by the wrapped function and installs subsequent processing steps as a callback to that Future
. The installed callback will read the value from the Future
when completed and send the value back as the RPC response. That also means the returned Future
only exists on the callee side and is never sent through RPC. This decorator is useful when the wrapped function’s (fn
) execution needs to pause and resume due to, e.g., containing rpc_async()
or waiting for other signals.
Note
To enable asynchronous execution, applications must pass the function object returned by this decorator to RPC APIs. If RPC detected attributes installed by this decorator, it knows that this function returns a Future
object and will handle that accordingly. However, this does not mean this decorator has to be outmost one when defining a function. For example, when combined with @staticmethod
or @classmethod
, @rpc.functions.async_execution
needs to be the inner decorator to allow the target function be recognized as a static or class function. This target function can still execute asynchronously because, when accessed, the static or class method preserves attributes installed by @rpc.functions.async_execution
.
The returned Future
object can come from rpc_async()
, then()
, or Future
constructor. The example below shows directly using the Future
returned by then()
.
>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @rpc.functions.async_execution >>> def async_add_chained(to, x, y, z): >>> # This function runs on "worker1" and returns immediately when >>> # the callback is installed through the `then(cb)` API. In the >>> # mean time, the `rpc_async` to "worker2" can run concurrently. >>> # When the return value of that `rpc_async` arrives at >>> # "worker1", "worker1" will run the lambda function accordingly >>> # and set the value for the previously returned `Future`, which >>> # will then trigger RPC to send the result back to "worker0". >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add_chained, >>> args=("worker2", torch.ones(2), 1, 1) >>> ) >>> print(ret) # prints tensor([3., 3.])
When combined with TorchScript decorators, this decorator must be the outmost one.
>>> from torch import Tensor >>> from torch.futures import Future >>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @torch.jit.script >>> def script_add(x: Tensor, y: Tensor) -> Tensor: >>> return x + y >>> >>> @rpc.functions.async_execution >>> @torch.jit.script >>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]: >>> return rpc.rpc_async(to, script_add, (x, y)) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add, >>> args=("worker2", torch.ones(2), 1) >>> ) >>> print(ret) # prints tensor([2., 2.])
When combined with static or class method, this decorator must be the inner one.
>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> class AsyncExecutionClass: >>> >>> @staticmethod >>> @rpc.functions.async_execution >>> def static_async_add(to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> @classmethod >>> @rpc.functions.async_execution >>> def class_async_add(cls, to, x, y, z): >>> ret_fut = torch.futures.Future() >>> rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: ret_fut.set_result(fut.wait() + z) >>> ) >>> return ret_fut >>> >>> @rpc.functions.async_execution >>> def bound_async_add(self, to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.static_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.]) >>> >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.class_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.])
This decorator also works with RRef helpers, i.e., . torch.distributed.rpc.RRef.rpc_sync()
, torch.distributed.rpc.RRef.rpc_async()
, and torch.distributed.rpc.RRef.remote()
.
>>> from torch.distributed import rpc >>> >>> # reuse the AsyncExecutionClass class above >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2) >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait() >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here() >>> print(ret) # prints tensor([4., 4.])
The RPC module can leverage different backends to perform the communication between the nodes. The backend to be used can be specified in the init_rpc()
function, by passing a certain value of the BackendType
enum. Regardless of what backend is used, the rest of the RPC API won’t change. Each backend also defines its own subclass of the RpcBackendOptions
class, an instance of which can also be passed to init_rpc()
to configure the backend’s behavior.
class torch.distributed.rpc.BackendType
An enum class of available backends.
PyTorch ships with two builtin backends: BackendType.TENSORPIPE
and BackendType.PROCESS_GROUP
. Additional ones can be registered using the register_backend()
function.
class torch.distributed.rpc.RpcBackendOptions
An abstract structure encapsulating the options passed into the RPC backend. An instance of this class can be passed in to init_rpc()
in order to initialize RPC with specific configurations, such as the RPC timeout and init_method
to be used.
property init_method
URL specifying how to initialize the process group. Default is env://
property rpc_timeout
A float indicating the timeout to use for all RPCs. If an RPC does not complete in this timeframe, it will complete with an exception indicating that it has timed out.
The TensorPipe agent, which is the default, leverages the TensorPipe library, which provides a natively point-to-point communication primitive specifically suited for machine learning that fundamentally addresses some of the limitations of Gloo. Compared to Gloo, it has the advantage of being asynchronous, which allows a large number of transfers to occur simultaneously, each at their own speed, without blocking each other. It will only open pipes between pairs of nodes when needed, on demand, and when one node fails only its incident pipes will be closed, while all other ones will keep working as normal. In addition, it is able to support multiple different transports (TCP, of course, but also shared memory, NVLink, InfiniBand, …) and can automatically detect their availability and negotiate the best transport to use for each pipe.
The TensorPipe backend has been introduced in PyTorch v1.6 and is being actively developed. At the moment, it only supports CPU tensors, with GPU support coming soon. It comes with a TCP-based transport, just like Gloo. It is also able to automatically chunk and multiplex large tensors over multiple sockets and threads in order to achieve very high bandwidths. The agent will be able to pick the best transport on its own, with no intervention required.
Example:
>>> import os >>> from torch.distributed import rpc >>> os.environ['MASTER_ADDR'] = 'localhost' >>> os.environ['MASTER_PORT'] = '29500' >>> >>> rpc.init_rpc( >>> "worker1", >>> rank=0, >>> world_size=2, >>> rpc_backend_options=rpc.TensorPipeRpcBackendOptions( >>> num_worker_threads=8, >>> rpc_timeout=20 # 20 second timeout >>> ) >>> ) >>> >>> # omitting init_rpc invocation on worker2
class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads: int = 16, rpc_timeout: float = 60.0, init_method: str = 'env://', _transports: List = None, _channels: List = None)
[source]
The backend options for TensorPipeAgent
, derived from RpcBackendOptions
.
TensorPipeAgent
to execute requests (default: 16).rpc_sync()
and rpc_async()
if necessary.init_process_group()
(default: env://
).property init_method
URL specifying how to initialize the process group. Default is env://
property num_worker_threads
The number of threads in the thread-pool used by TensorPipeAgent
to execute requests.
property rpc_timeout
A float indicating the timeout to use for all RPCs. If an RPC does not complete in this timeframe, it will complete with an exception indicating that it has timed out.
Warning
The Process Group Backend will be deprecated soon, we recommend using the TensorPipe Backend instead.
The Process Group agent instantiates a process group from the distributed
module and utilizes its point-to-point communication capabilities to send RPC messages. Internally, the process group uses the Gloo library.
Gloo has been hardened by years of extensive use in PyTorch and is thus very reliable. However, as it was designed to perform collective communication, it may not always be the best fit for RPC. For example, each networking operation is synchronous and blocking, which means that it cannot be run in parallel with others. Moreover, it opens a connection between all pairs of nodes, and brings down all of them when one fails, thus reducing the resiliency and the elasticity of the system.
Example:
>>> import os >>> from torch.distributed import rpc >>> os.environ['MASTER_ADDR'] = 'localhost' >>> os.environ['MASTER_PORT'] = '29500' >>> >>> rpc.init_rpc( >>> "worker1", >>> rank=0, >>> world_size=2, >>> backend=rpc.BackendType.PROCESS_GROUP, >>> rpc_backend_options=rpc.ProcessGroupRpcBackendOptions( >>> num_send_recv_threads=16, >>> rpc_timeout=20 # 20 second timeout >>> ) >>> ) >>> >>> # omitting init_rpc invocation on worker2
class torch.distributed.rpc.ProcessGroupRpcBackendOptions
The backend options class for ProcessGroupAgent
, which is derived from RpcBackendOptions
.
ProcessGroupAgent
(default: 4).rpc_sync()
and rpc_async()
if necessary.ProcessGroupGloo
(default: env://
).property init_method
URL specifying how to initialize the process group. Default is env://
property num_send_recv_threads
The number of threads in the thread-pool used by ProcessGroupAgent.
property rpc_timeout
A float indicating the timeout to use for all RPCs. If an RPC does not complete in this timeframe, it will complete with an exception indicating that it has timed out.
An RRef
(Remote REFerence) is a reference to a value of some type T
(e.g. Tensor
) on a remote worker. This handle keeps the referenced remote value alive on the owner, but there is no implication that the value will be transferred to the local worker in the future. RRefs can be used in multi-machine training by holding references to nn.Modules that exist on other workers, and calling the appropriate functions to retrieve or modify their parameters during training. See Remote Reference Protocol for more details.
class torch.distributed.rpc.RRef
[source]
confirmed_by_owner(self: torch.distributed.rpc.RRef) → bool
Returns whether this RRef
has been confirmed by the owner. OwnerRRef
always returns true, while UserRRef
only returns true when the owner knowns about this UserRRef
.
is_owner(self: torch.distributed.rpc.RRef) → bool
Returns whether or not the current node is the owner of this RRef
.
local_value(self: torch.distributed.rpc.RRef) → object
If the current node is the owner, returns a reference to the local value. Otherwise, throws an exception.
owner(self: torch.distributed.rpc.RRef) → torch.distributed.rpc.WorkerInfo
Returns worker information of the node that owns this RRef
.
owner_name(self: torch.distributed.rpc.RRef) → str
Returns worker name of the node that owns this RRef
.
remote(self: torch.distributed.rpc.RRef) → object
Create a helper proxy to easily launch a remote
using the owner of the RRef as the destination to run functions on the object referenced by this RRef. More specifically, rref.remote().func_name(*args, **kwargs)
is the same as the following:
>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.remote().size().to_here() # returns torch.Size([2, 2]) >>> rref.remote().view(1, 4).to_here() # returns tensor([[1., 1., 1., 1.]])
rpc_async(self: torch.distributed.rpc.RRef) → object
Create a helper proxy to easily launch an rpc_async
using the owner of the RRef as the destination to run functions on the object referenced by this RRef. More specifically, rref.rpc_async().func_name(*args, **kwargs)
is the same as the following:
>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_async().size().wait() # returns torch.Size([2, 2]) >>> rref.rpc_async().view(1, 4).wait() # returns tensor([[1., 1., 1., 1.]])
rpc_sync(self: torch.distributed.rpc.RRef) → object
Create a helper proxy to easily launch an rpc_sync
using the owner of the RRef as the destination to run functions on the object referenced by this RRef. More specifically, rref.rpc_sync().func_name(*args, **kwargs)
is the same as the following:
>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_sync().size() # returns torch.Size([2, 2]) >>> rref.rpc_sync().view(1, 4) # returns tensor([[1., 1., 1., 1.]])
to_here(self: torch.distributed.rpc.RRef, timeout: float = -1.0) → object
Blocking call that copies the value of the RRef from the owner to the local node and returns it. If the current node is the owner, returns a reference to the local value.
timeout (float, optional) – Timeout for to_here
. If the call does not complete within this timeframe, an exception indicating so will be raised. If this argument is not provided, the default RPC timeout (60s) will be used.
This module provides an RPC-based distributed autograd framework that can be used for applications such as model parallel training. In short, applications may send and receive gradient recording tensors over RPC. In the forward pass, we record when gradient recording tensors are sent over RPC and during the backward pass we use this information to perform a distributed backward pass using RPC. For more details see Distributed Autograd Design.
class torch.distributed.autograd.context
[source]
Context object to wrap forward and backward passes when using distributed autograd. The context_id
generated in the with
statement is required to uniquely identify a distributed backward pass on all workers. Each worker stores metadata associated with this context_id
, which is required to correctly execute a distributed autograd pass.
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum() >>> dist_autograd.backward(context_id, [loss])
torch.distributed.autograd.backward(context_id: int, roots: List[Tensor], retain_graph = False) → None
Kicks off the distributed backward pass using the provided roots. This currently implements the FAST mode algorithm which assumes all RPC messages sent in the same distributed autograd context across workers would be part of the autograd graph during the backward pass.
We use the provided roots to discover the autograd graph and compute appropriate dependencies. This method blocks until the entire autograd computation is done.
We accumulate the gradients in the appropriate torch.distributed.autograd.context
on each of the nodes. The autograd context to be used is looked up given the context_id
that is passed in when torch.distributed.autograd.backward()
is called. If there is no valid autograd context corresponding to the given ID, we throw an error. You can retrieve the accumulated gradients using the get_gradients()
API.
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> pred = model.forward() >>> loss = loss_func(pred, loss) >>> dist_autograd.backward(context_id, loss)
torch.distributed.autograd.get_gradients(context_id: int) → Dict[Tensor, Tensor]
Retrieves a map from Tensor to the appropriate gradient for that Tensor accumulated in the provided context corresponding to the given context_id
as part of the distributed autograd backward pass.
context_id (int) – The autograd context id for which we should retrieve the gradients.
A map where the key is the Tensor and the value is the associated gradient for that Tensor.
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = t1 + t2 >>> dist_autograd.backward(context_id, [loss.sum()]) >>> grads = dist_autograd.get_gradients(context_id) >>> print(grads[t1]) >>> print(grads[t2])
torch.distributed.optim
exposes DistributedOptimizer, which takes a list of remote parameters (RRef
) and runs the optimizer locally on the workers where the parameters live. The distributed optimizer can use any of the local optimizer Algorithms to apply the gradients on each worker.
class torch.distributed.optim.DistributedOptimizer(optimizer_class, params_rref, *args, **kwargs)
[source]
DistributedOptimizer takes remote references to parameters scattered across workers and applies the given optimizer locally for each parameter.
This class uses get_gradients()
in order to retrieve the gradients for specific parameters.
Concurrent calls to step()
, either from the same or different clients, will be serialized on each worker – as each worker’s optimizer can only work on one set of gradients at a time. However, there is no guarantee that the full forward-backward-optimizer sequence will execute for one client at a time. This means that the gradients being applied may not correspond to the latest forward pass executed on a given worker. Also, there is no guaranteed ordering across workers.
>>> import torch.distributed.autograd as dist_autograd >>> import torch.distributed.rpc as rpc >>> from torch import optim >>> from torch.distributed.optim import DistributedOptimizer >>> >>> with dist_autograd.context() as context_id: >>> # Forward pass. >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> loss = rref1.to_here() + rref2.to_here() >>> >>> # Backward pass. >>> dist_autograd.backward(context_id, [loss.sum()]) >>> >>> # Optimizer. >>> dist_optim = DistributedOptimizer( >>> optim.SGD, >>> [rref1, rref2], >>> lr=0.05, >>> ) >>> dist_optim.step(context_id)
step(context_id)
[source]
Performs a single optimization step.
This will call torch.optim.Optimizer.step()
on each worker containing parameters to be optimized, and will block until all workers return. The provided context_id
will be used to retrieve the corresponding context
that contains the gradients that should be applied to the parameters.
context_id – the autograd context id for which we should run the optimizer step.
The distributed autograd design note covers the design of the RPC-based distributed autograd framework that is useful for applications such as model parallel training.
The RRef design note covers the design of the RRef (Remote REFerence) protocol used to refer to values on remote workers by the framework.
The RPC tutorials introduce users to the RPC framework, provide several example applications using torch.distributed.rpc APIs, and demonstrate how to use the profiler to profile RPC-based workloads.
© 2019 Torch Contributors
Licensed under the 3-clause BSD License.
https://pytorch.org/docs/1.7.0/rpc.html