U
    cc                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ dd	lmZ d
dlmZ dZdZeddZG dd de	ZdS )zEvent receiver implementation.    N)
itemgetter)Queue)maybe_channel)ConsumerMixin)uuid)app_or_default)adjust_timestamp   )get_exchange)EventReceiver	utcoffset	timestampc                   @   s   e Zd ZdZdZdddZdd Zdd	 ZdddZdddZ	dddZ
dddZd
ejeeefddZeefddZedd ZdS )r   a?  Capture events.

    Arguments:
        connection (kombu.Connection): Connection to the broker.
        handlers (Mapping[Callable]): Event handlers.
            This is  a map of event type names and their handlers.
            The special handler `"*"` captures all events that don't have a
            handler.
    N#c
           
   	   C   s   t |p
| j| _t|| _|d kr&i n|| _|| _|p:t | _|pJ| jjj	| _
t| jp^| j | jjjd| _|d kr| jjj}|	d kr| jjj}	td| j
| jg| j| jdd||	d| _| jj| _| jj| _| jj| _|d kr| jjjdh}|| _d S )N)name.TF)exchangerouting_keyZauto_deleteZdurableZmessage_ttlexpiresjson)r   appr   channelhandlersr   r   node_idconfZevent_queue_prefixqueue_prefixr
   
connectionZconnection_for_writeZevent_exchanger   Zevent_queue_ttlZevent_queue_expiresr   joinqueueclockadjustadjust_clockforwardforward_clockZevent_serializeraccept)
selfr   r   r   r   r   r   r$   Z	queue_ttlZqueue_expires r&   P/var/www/html/project/venv/lib/python3.8/site-packages/celery/events/receiver.py__init__#   s:    


 


zEventReceiver.__init__c                 C   s(   | j |p| j d}|o"|| dS )z3Process event by dispatching to configured handler.*N)r   get)r%   typeeventhandlerr&   r&   r'   processB   s    zEventReceiver.processc                 C   s   || j g| jgd| jdgS )NT)queues	callbacksZno_ackr$   )r   _receiver$   )r%   ZConsumerr   r&   r&   r'   get_consumersG   s
     zEventReceiver.get_consumersTc                 K   s   |r| j |d d S )N)r   )wakeup_workers)r%   r   r   Z	consumerswakeupkwargsr&   r&   r'   on_consume_readyL   s    zEventReceiver.on_consume_readyc                 C   s   | j |||dS )Nlimittimeoutr4   Zconsume)r%   r8   r9   r4   r&   r&   r'   itercaptureQ   s    zEventReceiver.itercapturec                 C   s   | j |||dD ]}qdS )zOpen up a consumer capturing events.

        This has to run in the main process, and it will never stop
        unless :attr:`EventDispatcher.should_stop` is set to True, or
        forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
        r7   Nr:   )r%   r8   r9   r4   _r&   r&   r'   captureT   s    zEventReceiver.capturec                 C   s   | j jjd| j|d d S )N	heartbeat)r   r   )r   control	broadcastr   )r%   r   r&   r&   r'   r3   ^   s    
zEventReceiver.wakeup_workersc                 C   s   |d }|dkr4| j jpd|  }|d< | | n8z|d }	W n  tk
r`   |  |d< Y nX | |	 |rz||\}
}W n tk
r   Y nX |||
|d< | |d< ||fS )Nr+   z	task-sentr	   r   r   Zlocal_received)r   valuer!   KeyErrorr#   )r%   bodyZlocalizenowZtzfieldsr   CLIENT_CLOCK_SKEWr+   Z_cr   offsetr   r&   r&   r'   event_from_messagec   s"    

z EventReceiver.event_from_messagec                    sB   |||r.| j | j   fdd|D  n| j | |  d S )Nc                    s   g | ]} | qS r&   r&   ).0r,   Zfrom_messager.   r&   r'   
<listcomp>   s     z*EventReceiver._receive.<locals>.<listcomp>)r.   rG   )r%   rC   messagelist
isinstancer&   rI   r'   r1   ~   s    
zEventReceiver._receivec                 C   s   | j r| j jjS d S )N)r   r   client)r%   r&   r&   r'   r      s    zEventReceiver.connection)Nr   NNNNNN)T)NNT)NNT)N)__name__
__module____qualname____doc__r   r(   r.   r2   r6   r;   r=   r3   time	_TZGETTERr   rE   rG   rL   rM   r1   propertyr   r&   r&   r&   r'   r      s4   
           
 




 
r   )rR   rS   operatorr   Zkombur   Zkombu.connectionr   Zkombu.mixinsr   Zceleryr   Z
celery.appr   Zcelery.utils.timer   r,   r
   __all__rE   rT   r   r&   r&   r&   r'   <module>   s   
