:orphan: .. _remote-reference-protocol: Remote Reference Protocol ========================= This note describes the design details of Remote Reference protocol and walks through message flows in different scenarios. Make sure you're familiar with the :ref:`distributed-rpc-framework` before proceeding. Background ^^^^^^^^^^ RRef stands for Remote REFerence. It is a reference of an object which is located on the local or remote worker, and transparently handles reference counting under the hood. Conceptually, it can be considered as a distributed shared pointer. Applications can create an RRef by calling :meth:`~torch.distributed.rpc.remote`. Each RRef is owned by the callee worker of the :meth:`~torch.distributed.rpc.remote` call (i.e., owner) and can be used by multiple users. The owner stores the real data and keeps track of the global reference count. Every RRef can be uniquely identified by a global ``RRefId``, which is assigned at the time of creation on the caller of the :meth:`~torch.distributed.rpc.remote` call. On the owner worker, there is only one ``OwnerRRef`` instance, which contains the real data, while on user workers, there can be as many ``UserRRefs`` as necessary, and ``UserRRef`` does not hold the data. All usage on the owner will retrieve the unique ``OwnerRRef`` instance using the globally unique ``RRefId``. A ``UserRRef`` will be created when it is used as an argument or return value in :meth:`~torch.distributed.rpc.rpc_sync`, :meth:`~torch.distributed.rpc.rpc_async` or :meth:`~torch.distributed.rpc.remote` invocation, and the owner will be notified according to update the reference count. An ``OwnerRRef`` and its data will be deleted when there is no ``UserRRef`` instances globally and there are no reference to the ``OwnerRRef`` on the owner as well. Assumptions ^^^^^^^^^^^ RRef protocol is designed with the following assumptions. - **Transient Network Failures**: The RRef design handles transient network failures by retrying messages. It cannot handle node crashes or permanent network partitions. When those incidents occur, the application should take down all workers, revert to the previous checkpoint, and resume training. - **Non-idempotent UDFs**: We assume the user functions (UDF) provided to :meth:`~torch.distributed.rpc.rpc_sync`, :meth:`~torch.distributed.rpc.rpc_async` or :meth:`~torch.distributed.rpc.remote` are not idempotent and therefore cannot be retried. However, internal RRef control messages are idempotent and retried upon message failure. - **Out of Order Message Delivery**: We do not assume message delivery order between any pair of nodes, because both sender and receiver are using multiple threads. There is no guarantee on which message will be processed first. RRef Lifetime ^^^^^^^^^^^^^ The goal of the protocol is to delete an ``OwnerRRef`` at an appropriate time. The right time to delete an ``OwnerRRef`` is when there are no living ``UserRRef`` instances and user code is not holding references to the ``OwnerRRef`` either. The tricky part is to determine if there are any living ``UserRRef`` instances. Design Reasoning ---------------- A user can get a ``UserRRef`` in three situations: 1) Receiving a ``UserRRef`` from the owner. 2) Receiving a ``UserRRef`` from another user. 3) Creating a new ``UserRRef`` owned by another worker. Case 1 is the simplest where the owner passes its RRef to a user, where the owner calls :meth:`~torch.distributed.rpc.rpc_sync`, :meth:`~torch.distributed.rpc.rpc_async`, or :meth:`~torch.distributed.rpc.remote` and uses its RRef as an argument. In this case a new ``UserRRef`` will be created on the user. As the owner is the caller, it can easily update its local reference count on the ``OwnerRRef``. The only requirement is that any ``UserRRef`` must notify the owner upon destruction. Hence, we need the first guarantee: **G1. The owner will be notified when any UserRRef is deleted.** As messages might come delayed or out-of-order, we need one more guarantee to make sure the delete message is not processed too soon. If A sends a message to B that involves an RRef, we call the RRef on A (the parent RRef) and the RRef on B (the child RRef). **G2. Parent RRef will NOT be deleted until the child RRef is confirmed by the owner.** In cases 2 and 3, it is possible that the owner has only partial or no knowledge at all about the RRef fork graph. For example, an RRef could be constructed on a user, and before the owner receives any RPC call, the creator user might have already shared the RRef with other users, and those users could further share the RRef. One invariant is that the fork graph of any RRef is always a tree, because forking an RRef always creates a new ``UserRRef`` instance on the callee (except if the callee is the owner), and hence every RRef has a single parent. The owner's view on any ``UserRRef`` in the tree has three stages: .. code:: 1) unknown -> 2) known -> 3) deleted. The owner's view of the entire tree keeps changing. The owner deletes its ``OwnerRRef`` instance when it thinks there are no living ``UserRRef`` instances, i.e., when ``OwnerRRef`` is deleted, all ``UserRRef`` instances could be either indeed deleted or unknown. The dangerous case is when some forks are unknown and others are deleted. **G2** trivially guarantees that no parent ``UserRRef`` can be deleted before the owner knows all of its children ``UserRRef`` instances. However, it is possible that the child ``UserRRef`` may be deleted before the owner knows its parent ``UserRRef``. Consider the following example, where the ``OwnerRRef`` forks to A, then A forks to Y, and Y forks to Z: .. code:: OwnerRRef -> A -> Y -> Z If all of Z's messages, including the delete message, are processed by the owner before Y's messages. the owner will learn of Z's deletion befores knowing Y exists. Nevertheless, this does not cause any problem. Because, at least one of Y's ancestors will be alive (A) and it will prevent the owner from deleting the ``OwnerRRef``. More specifically, if the owner does not know Y, A cannot be deleted due to **G2**, and the owner knows A since it is A's parent. Things get a little trickier if the RRef is created on a user: .. code:: OwnerRRef ^ | A -> Y -> Z If Z calls :meth:`~torch.distributed.rpc.RRef.to_here` on the ``UserRRef``, the owner at least knows A when Z is deleted, because otherwise, :meth:`~torch.distributed.rpc.RRef.to_here` wouldn't finish. If Z does not call :meth:`~torch.distributed.rpc.RRef.to_here`, it is possible that the owner receives all messages from Z before any message from A and Y. In this case, as the real data of the ``OwnerRRef`` has not been created yet, there is nothing to be deleted either. It is the same as Z does not exist at all. Hence, it's still OK. Implementation -------------- **G1** is implemented by sending out a delete message in ``UserRRef`` destructor. To provide **G2**, the parent ``UserRRef`` is put into a context whenever it is forked, indexed by the new ``ForkId``. The parent ``UserRRef`` is only removed from the context when it receives an acknowledgement message (ACK) from the child, and the child will only send out the ACK when it is confirmed by the owner. Protocol Scenarios ^^^^^^^^^^^^^^^^^^ Let's now discuss how the above designs translate to the protocol in four scenarios. User Share RRef with Owner as Return Value ------------------------------------------ .. code:: import torch import torch.distributed.rpc as rpc # on worker A rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1)) # say the rref has RRefId 100 and ForkId 1 rref.to_here() In this case, the ``UserRRef`` is created on the user worker A, then it is passed to the owner worker B together with the remote message, and then B creates the ``OwnerRRef``. The method :meth:`~torch.distributed.rpc.remote` returns immediately, meaning that the ``UserRRef`` can be forked/used before the owner knows about it. On the owner, when receiving the :meth:`~torch.distributed.rpc.remote` call, it will create the ``OwnerRRef``, and returns an ACK to acknowledge ``{100, 1}`` (``RRefId``, ``ForkId``). Only after receiving this ACK, can A delete its ``UserRRef``. This involves both **G1** and **G2**. **G1** is obvious. For **G2**, the ``OwnerRRef`` is a child of the ``UserRRef``, and the ``UserRRef`` is not deleted until it receives the ACK from the owner. .. image:: https://user-images\.githubusercontent\.com/16999635/69164772-98181300-0abe-11ea-93a7-9ad9f757cd94.png :alt: user_to_owner_ret.png :width: 500 px The diagram above shows the message flow, where solid arrow contains user function and dashed arrow are builtin messages. Note that the first two messages from A to B (:meth:`~torch.distributed.rpc.remote` and :meth:`~torch.distributed.rpc.RRef.to_here`) may arrive at B in any order, but the final delete message will only be sent out when: - B acknowledges ``UserRRef {100, 1}`` (G2), and - Python GC agrees to delete the local ``UserRRef`` instance. This occurs when the RRef is no longer in scope and is eligible for garbage collection. User Share RRef with Owner as Argument -------------------------------------- .. code:: import torch import torch.distributed.rpc as rpc # on worker A and worker B def func(rref): pass # on worker A rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1)) # say the rref has RRefId 100 and ForkId 1 rpc.rpc_async('B', func, args=(rref, )) In this case, after creating the ``UserRRef`` on A, A uses it as an argument in a followup RPC call to B. A will keep ``UserRRef {100, 1}`` alive until it receives the acknowledge from B (**G2**, not the return value of the RPC call). This is necessary because A should not send out the delete message until all previous messages are received, otherwise, the ``OwnerRRef`` could be deleted before usage as we do not guarantee message delivery order. This is done by creating a child ``ForkId`` of RRef, holding them in a map until receives the owner confirms the child ``ForkId``. The figure below shows the message flow. .. image:: https://user-images.githubusercontent.com/16999635/69164845-b67e0e80-0abe-11ea-93fa-d24674e75a2b.png :alt: user_to_owner_arg.png :width: 500 px Note that the ``UserRRef`` could be deleted on B before func finishes or even starts. However this is OK, as at the time B sends out ACK for the child ``ForkId``, it already acquired the ``OwnerRRef`` instance, which would prevent it been deleted too soon. Owner Share RRef with User -------------------------- Owner to user is the simplest case, where the owner can update reference counting locally, and does not need any additional control message to notify others. Regarding **G2**, it is same as the parent receives the ACK from the owner immediately, as the parent is the owner. .. code:: import torch import torch.distributed.rpc as RRef, rpc # on worker B and worker C def func(rref): pass # on worker B, creating a local RRef rref = RRef("data") # say the rref has RRefId 100 dist.rpc_async('C', func, args=(rref, )) .. image:: https://user-images.githubusercontent.com/16999635/69164921-c990de80-0abe-11ea-9250-d32ad00cf4ae.png :alt: owner_to_user.png :width: 500 px The figure above shows the message flow. Note that when the ``OwnerRRef`` exits scope after the rpc_async call, it will not be deleted, because internally there is a map to hold it alive if there is any known forks, in which case is ``UserRRef {100, 1}``. (**G2**) User Share RRef with User ------------------------- This is the most complicated case where caller user (parent ``UserRRef``), callee user (child ``UserRRef``), and the owner all need to get involved. .. code:: import torch import torch.distributed.rpc as rpc # on worker A and worker C def func(rref): pass # on worker A rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1)) # say the rref has RRefId 100 and ForkId 1 rpc.rpc_async('C', func, args=(rref, )) .. image:: https://user-images.githubusercontent.com/16999635/69164971-d6adcd80-0abe-11ea-971d-6b7af131f0fd.png :alt: user_to_user.png :width: 500 px When C receives the child ``UserRRef`` from A, it sends out a fork request to the owner B. Later, when the B confirms the ``UserRRef`` on C, C will perform two actions in parallel: 1) send out the child ACK to A ,and 2) run the user provided function. During this time, the parent (A) will hold its ``UserRRef {100, 1}`` alive to achieve **G2**.