o
    ,h@                  	   @   s  U d dl Z d dlmZmZmZ d dlZd dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ dd	lmZmZmZmZmZmZmZ dd
lmZmZmZmZm Z m!Z! ddl"m#Z#m$Z$ dgZ%e&e' e(d< dedede)fddZ*de&e de)fddZ+de&e de&e de&e fddZ,dej-defddZ.dedefddZ/dededefd d!Z0d"e'dede fd#d$Z1d"e'dedede fd%d&Z2d"e'dej-de fd'd(Z3d"e'd)efd*d+Z4d,d- Z5d.d/ Z6d"e'd0ed1e&e de&e fd2dZ7d3edefd4d5Z8d"e'd6ede&e  fd7d8Z9dedefd9d:Z:dej-de&e fd;d<Z;d"e'd=ed>ede&e fd?d@Z<d3e=e'ef defdAdBZ>dCedDedEedFefdGdHZ?dS )I    N)AnyCallablecast)_get_device_module)ShardMetadata)ShardedTensor)DTensor)%compute_local_shape_and_global_offset   )BytesStorageMetadataChunkStorageMetadataMetadataIndexSTATE_DICT_TYPESTORAGE_TYPESTensorPropertiesTensorStorageMetadata)LoadItemTypeReadItemSavePlanTensorWriteData	WriteItemWriteItemType)"_check_shard_metadata_pair_overlap+_shards_get_overlap_region_wrt_saved_tensor create_read_items_for_chunk_list__all__plan
other_planreturnc           
      C   s  | j |j krdS t| jt|jkrdS t| j|jD ]j\}}|j|jkr( dS |j}|j}|j|jks@|j|jks@|j|jkrC dS |j}|j}|rM|rQ|sT|rT dS |r|r|j	|j	kra dS |j
}|j
}	|rk|	ro|sr|	rr dS |r|	r|j|	jks|j|	jkr dS qdS )a  
    Compare the two Save plans and return True if they are equal.

    Args:
        plan (SavePlan): First SavePlan to compare.
        other_plan (SavePlan): Second SavePlan to compare.

    Returns:
       True if the two plans are equal, False otherwise.
    FT)usablelenitemsziptypeindexfqnoffsettensor_datasizechunkoffsetssizes)
r   r   	plan_itemother_plan_itemplan_metadata_indexother_plan_metadata_indexr'   other_tensor_datar)   other_chunk r2   g/var/www/html/scripts/venv/lib/python3.10/site-packages/torch/distributed/checkpoint/planner_helpers.py_compare_save_plans'   sD   r4   delta_plansc                 C   s   t dd | D S )z
    Check if any delta plan is usable, indicating the plan has changed.

    Args:
        delta_plans (List[SavePlan]): A list of delta plans to check.
    Returns:
        True if any delta plan is usable, False otherwise.
    c                 s   s    | ]}|o|j V  qd S N)r   ).0
delta_planr2   r2   r3   	<genexpr>q   s    z(_contains_usable_plan.<locals>.<genexpr>)any)r5   r2   r2   r3   _contains_usable_planh   s   	r;   cached_plansc                 C   s<   g }t | |D ]\}}|r|js|| q|| q|S )ac  
    Merge a list of delta plans into a single plan.

    Args:
        cached_plans (List[SavePlan]): A list of cached plans.
        delta_plans (List[SavePlan]): A list of delta plans to merge. It can contain empty plans

    Returns:
        A single merged plan. If a delta plan is not usable, use the cached plan. Otherwise, use the delta plan.
    )r"   r   append)r<   r5   merged_planscached_planr8   r2   r2   r3   _merge_delta_local_planst   s   
r@   tensorc                 C   s$   t tdgt|   |  dS )Nr   r*   r+   )r   torchSizer    r(   )rA   r2   r2   r3   _create_chunk_from_tensor   s   rE   shard_mdc                 C   s   t t| jt| jdS NrB   )r   rC   rD   shard_offsetsshard_sizes)rF   r2   r2   r3   _chunk_for_shard   s   

rJ   sharded_tensorc                 C   s>   |   j}t|j|j|j|j|jd}tt	|||   j
dS )N)dtypelayoutrequires_gradmemory_format
pin_memoryr)   
propertiesr(   )metadatatensor_propertiesr   rL   rM   rN   rO   rP   r   rJ   r(   )rK   rF   shard_propertiesrR   r2   r2   r3   _sharded_tensor_metadata   s   
rV   r%   c              	   C   sb   t |j|j|j\}}t|t|}}tt| |tj	t
t||dt| | ddS )NrB   rQ   r$   r#   r'   )r	   shapedevice_mesh
placementsrC   rD   r   r   r   SHARDr   r   r   create_from_tensorto_localr(   )r%   rA   r+   r*   r2   r2   r3   _create_write_items_for_dtensor   s    r^   c                 C   s(   t |j}tt| |tjt||dS )NrW   )rC   rD   rH   r   r   r   r[   rV   )r%   rK   rF   r*   r2   r2   r3   _create_write_item_for_shard   s   r_   c                 C   sN   t dgt|  }tt| |tjtt	|| dt
|| ddS )Nr   rB   rQ   rW   )rC   rD   r    r(   r   r   r   TENSORr   r   r   r\   )r%   rA   r*   r2   r2   r3   _create_write_item_for_tensor   s   ra   bytesc                 C   s   t t| tjdS )N)r$   r#   )r   r   r   BYTE_IO)r%   rb   r2   r2   r3   _create_write_item_for_bytesio   s   rd   c              	   C   s.   t tj| t|f|t|ft|fdS N)r#   
dest_indexdest_offsetsstorage_indexstorage_offsetslengths)r   r   rc   rC   rD   rf   dest_offsetrh   storage_offsetlengthr2   r2   r3   _create_read_item_for_byteio   s   


ro   c              	   C   s(   t tj| t||t|t|dS re   )r   r   r`   rC   rD   rf   rg   rh   ri   rj   r2   r2   r3   _create_read_item_for_tensor   s   rq   checkpoint_mdlocal_chunksc                 C   s   g }t |D ]L\}}t |jD ]B\}}t||sqg }g }	g }
t||dD ]\}}}}|| |	| |
| q%|tt| |j||	t| |j|||
d qq|S )aW  
    Create a list of ``ReadItem`` based on the checkpoint and local chunks.

    This applies the resharding algorithm and computes the reads needed
    to satisfy ``local_chunks`` with a checkpoint described by ``checkpoint_md``.

    Args:
        fqn (str) : The state_dict FQN to pass to ``ReadItem``.
        checkpoint_md (TensorStorageMetadata): metadata for a given tensor
            from a checkpoint.
        local_chunks (List[ChunkStorageMetadata]): Local chunks that needs to be
            loaded.

    Returns:
        A list of ``ReadItem`` that will satisfy all input chunks.
    )saved_shardcurrent_shardrp   )	enumeratechunksr   r   r=   rq   r   r*   )r%   rr   rs   
read_itemsidxshardstorage_idx
storage_mdri   rg   rj   _dimoffset_for_saved_tensoroffset_for_current_tensorrn   r2   r2   r3   r      s<   



state_dictc                    s   g }|   D ]?\ ttr|t  qttr.| fdd jD  qtt	j
r=|t  q|t  qt|S )Nc                 3   s    | ]	}t  |V  qd S r6   )r_   )r7   rF   r%   objr2   r3   r9   8  s
    

z5_create_default_metadata_only_plan.<locals>.<genexpr>)r!   
isinstancer   r=   r^   r   extendrS   shards_metadatarC   Tensorra   rd   r   )r   requestsr2   r   r3   "_create_default_metadata_only_plan2  s   


r   objectc                    s\   t dr S ttr fdd D S ttjr(t gS t gS )N__create_write_items__c                    s   g | ]	}t  |jqS r2   )r_   rS   r7   rz   r%   r   r2   r3   
<listcomp>H      z'_create_write_items.<locals>.<listcomp>)	hasattrr   r   r   local_shardsrC   r   ra   rd   r   r2   r   r3   _create_write_itemsC  s   

r   c                 C   s8   t | j| j| j\}}t|t|}}t||dS rG   )r	   rX   rY   rZ   rC   rD   r   )rA   r+   r*   r2   r2   r3   _create_chunk_from_dtensorR  s   r   c                 C   sb   t | dr|  }|S t| trdd |  D }|S t| tjr(t| g}|S tdt	|  )N__create_chunk_list__c                 S   s   g | ]}t |jqS r2   )rJ   rS   r   r2   r2   r3   r   b  s    
z&_create_chunk_list.<locals>.<listcomp>zMUnsupported Type, expecting one of [Tensor, DTensor, ShardedTensor] ,but got )
r   r   r   r   r   rC   r   rE   
ValueErrorr#   )rA   rs   r2   r2   r3   _create_chunk_list]  s    


r   mdr   c              
   C   sx   t |ts.zt|}W n ty' } ztd|  ddt|  |d }~ww t| ||S tt| dt| dddgS )Nz Invalid checkpoint metadata for z, z(expected BytesStorageMetadata but found r   rk   )r   r   r   r   r#   r   ro   r   )r%   r   r   rs   exr2   r2   r3   _create_read_itemsp  s,   

r   c                 C   s>   dt fdd}dtfdd}dtjfdd}t| ||| dS )	zP
    Initializes meta tensor if the meta tensor is DTensor or torch.Tensor.
    valuec                 S   st   t | dd }|tdkr8tj j}ttjt|	 }tj
|  |d}tj|| j| j|  |  d}|S | S )Ndevicemetar   )rY   rZ   rX   stride)getattrrC   r   distdistributed_c10d_get_pg_default_devicer#   r   r   current_device
empty_liker]   r   
from_localrY   rZ   r(   r   )r   r   device_typenew_local_tensordtensorr2   r2   r3   dtensor_func  s    z&_init_state_dict.<locals>.dtensor_funcc                 S   s2   t | dd }|tdkrtdt|  d| S )Nr   r   zFound unsupported type z for meta device loading.)r   rC   r   RuntimeErrorr#   )r   r   r2   r2   r3   sharded_tensor_func  s   z-_init_state_dict.<locals>.sharded_tensor_funcc                 S   sP   t | dd }|tdkr&tj j}ttjt|	 }tj
| |d}|S | S )Nr   r   r   )r   rC   r   r   r   r   r#   r   r   r   r   )r   r   r   rA   r2   r2   r3   tensor_func  s   z%_init_state_dict.<locals>.tensor_funcN)r   r   rC   r   _iterate_state_dict)r   r   r   r   r2   r2   r3   _init_state_dict  s   	r   iter_objectr   r   r   c                    s   t | tr	 | S t | tr| S t | tjr| S t | ttttt	j
fs+| du r-| S t | trF|  D ]\}}t| | |< q6| S t | ttfrc fdd| D }t | trat|}|S dS )a$  
    Iterate through the state dict, applying the given functions to each tensor type
    and update the state dict in place.

    Args:
        iter_object (Any): the target state_dict.
        sharded_tensor_func (Callable): the function to apply to ShardedTensor
        dtensor_func (Callable): the function to apply to DTensor
        tensor_func (Callable): the function to apply to Tensor

    # TODO: let state_dict_util._iterate_state_dict() to support in place option
    so we don't need to have two versions of _iterate_state_dict.
    Nc                    s   g | ]	}t | qS r2   )r   )r7   vr   r   r   r2   r3   r     r   z'_iterate_state_dict.<locals>.<listcomp>)r   r   r   rC   r   intfloatstrrb   ioBytesIOdictr!   r   listtuple)r   r   r   r   keyr   retr2   r   r3   r     s0   




r   )@r   typingr   r   r   rC   torch.distributeddistributedr   torch._utilsr   !torch.distributed._shard.metadatar   'torch.distributed._shard.sharded_tensorr   torch.distributed.tensorr   torch.distributed.tensor._utilsr	   rS   r   r   r   r   r   r   r   plannerr   r   r   r   r   r   
reshardingr   r   r   r   r   __annotations__boolr4   r;   r@   r   rE   rJ   rV   r^   r_   ra   rd   ro   rq   r   r   r   r   r   r   r   r   r   r2   r2   r2   r3   <module>   s   
$ 	A



77