U
    ccv                     @   sJ  d Z ddlZddlZddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZmZ ddlmZmZ ddlmZ d	d
lmZ dZdeeeZejdkrddlZddlZddlZejZdZ ej!Z"e# Z$dd Z%dd Z&nBejdkrddl'Z'ddl'mZm"Z"m Z  dd Z%dd Z&ne(dG dd dej)Z)G dd dej*Z*dS )a=	  File-system Transport module for kombu.

Transport using the file-system as the message store. Messages written to the
queue are stored in `data_folder_in` directory and
messages read from the queue are read from `data_folder_out` directory. Both
directories must be created manually. Simple example:

* Producer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_in', 'data_folder_out': 'data_out'
        }
    )
    conn.connect()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            producer = kombu.Producer(channel)
            producer.publish(
                        {'hello': 'world'},
                        retry=True,
                        exchange=test_queue.exchange,
                        routing_key=test_queue.routing_key,
                        declare=[test_queue],
                        serializer='pickle'
            )

* Consumer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_out', 'data_folder_out': 'data_in'
        }
    )
    conn.connect()

    def callback(body, message):
        print(body, message)
        message.ack()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            consumer = kombu.Consumer(
                conn, [test_queue], accept=['pickle']
            )
            consumer.register_callback(callback)
            with consumer:
                conn.drain_events(timeout=1)

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: No
* Supports TTL: No

Connection String
=================
Connection string is in the following format:

.. code-block::

    filesystem://

Transport Options
=================
* ``data_folder_in`` - directory where are messages stored when written
  to queue.
* ``data_folder_out`` - directory from which are messages read when read from
  queue.
* ``store_processed`` - if set to True, all processed messages are backed up to
  ``processed_folder``.
* ``processed_folder`` - directory where are backed up processed files.
    N)Empty)	monotonic)ChannelError)bytes_to_strstr_to_bytes)dumpsloads)cached_property   )virtual)r
   r   r   .ntc                 C   s$   t |  }t ||ddt dS )Create file lock.r         N)	win32file_get_osfhandlefilenoZ
LockFileEx__overlapped)fileflagshfile r   T/var/www/html/project/venv/lib/python3.8/site-packages/kombu/transport/filesystem.pylocky   s    r   c                 C   s"   t |  }t |ddt dS )Remove file lock.r   r   N)r   r   r   ZUnlockFileExr   )r   r   r   r   r   unlock~   s    r   posix)LOCK_EXLOCK_NBLOCK_SHc                 C   s   t |  | dS )r   N)fcntlflockr   )r   r   r   r   r   r      s    c                 C   s   t |  t j dS )r   N)r    r!   r   ZLOCK_UN)r   r   r   r   r      s    z9Filesystem plugin only defined for NT and POSIX platformsc                   @   sl   e Zd ZdZdd Zdd Zdd Zdd	 Zed
d Z	e
dd Ze
dd Ze
dd Ze
dd ZdS )ChannelzFilesystem Channel.c                 K   s   d ttt d t |}tj| j	|}zTz*t|d}t|t |tt| W n$ tk
r   td|dY nX W 5 t
| |  X dS )zPut `message` onto `queue`.z{}_{}.{}.msgi  wbzCannot add file z to directoryN)formatintroundr   uuiduuid4ospathjoindata_folder_outr   closeopenr   r   writer   r   OSErrorr   )selfqueuepayloadkwargsfilenamefr   r   r   _put   s     


zChannel._putc                 C   s  d| d }t | j}t|}t|dkr|d}||dk rFq | jrT| j}nt	
 }ztt j| j|| W n tk
r   Y nX t j||}z.t|d}| }|  | jst | W n$ tk
r   td|dY nX tt|S t dS )zGet next message from `queue`.r   .msgr   rbzCannot read file z from queue.N)r)   listdirdata_folder_insortedlenpopfindstore_processedprocessed_foldertempfile
gettempdirshutilmover*   r+   r0   r.   readr-   remover   r   r   r   )r1   r2   
queue_findfolderr5   rA   r6   r3   r   r   r   _get   s:    



zChannel._getc                 C   s   d}d| d }t | j}t|dkr| }z8||dk rDW qt j| j|}t | |d7 }W q t	k
r|   Y qX q|S )z!Remove all messages from `queue`.r   r   r8   r
   )
r)   r:   r;   r=   r>   r?   r*   r+   rG   r0   r1   r2   countrH   rI   r5   r   r   r   _purge   s    
zChannel._purgec                 C   sN   d}d| d}t | j}t|dkrJ| }||dk r@q|d7 }q|S )z<Return the number of messages in `queue` as an :class:`int`.r   r   r8   r
   )r)   r:   r;   r=   r>   r?   rK   r   r   r   _size   s    
zChannel._sizec                 C   s
   | j jjS N)
connectionclienttransport_optionsr1   r   r   r   rR      s    zChannel.transport_optionsc                 C   s   | j ddS )Nr;   Zdata_inrR   getrS   r   r   r   r;      s    zChannel.data_folder_inc                 C   s   | j ddS )Nr,   Zdata_outrT   rS   r   r   r   r,     s    zChannel.data_folder_outc                 C   s   | j ddS )Nr@   FrT   rS   r   r   r   r@     s    zChannel.store_processedc                 C   s   | j ddS )NrA   	processedrT   rS   r   r   r   rA   	  s    zChannel.processed_folderN)__name__
__module____qualname____doc__r7   rJ   rM   rN   propertyrR   r	   r;   r,   r@   rA   r   r   r   r   r"      s   '



r"   c                       s@   e Zd ZdZeZe ZdZdZ	dZ
 fddZdd Z  ZS )	TransportzFilesystem Transport.r   
filesystemc                    s   t  j|f| | j| _d S rO   )super__init__global_statestate)r1   rQ   r4   	__class__r   r   r_     s    zTransport.__init__c                 C   s   dS )NzN/Ar   rS   r   r   r   driver_version  s    zTransport.driver_version)rW   rX   rY   rZ   r"   r   ZBrokerStater`   default_portZdriver_typeZdriver_namer_   rd   __classcell__r   r   rb   r   r\     s   r\   )+rZ   r)   rD   rB   r'   r2   r   timer   Zkombu.exceptionsr   Zkombu.utils.encodingr   r   Zkombu.utils.jsonr   r   Zkombu.utils.objectsr	    r   VERSIONr+   mapstr__version__nameZ
pywintypesZwin32conr   ZLOCKFILE_EXCLUSIVE_LOCKr   r   ZLOCKFILE_FAIL_IMMEDIATELYr   Z
OVERLAPPEDr   r   r   r    RuntimeErrorr"   r\   r   r   r   r   <module>   sB   Z


z