o
    i}1                     @   s   d dl Z d dlZ d dlZd dlZd dlZd dlZ	 e jdddkZ	e jddZ
e
dkZdZG dd	 d	ejZed
krBe  dS dS )    NTEST_WITH_INTERNET01LOCAL_WS_SERVER_PORTz-1Tc                   @   sB  e Zd ZG dd dZdd Zdd Zeeddd	 Z	ed
ddd Z
eeddd Zeeddd Zeeddd Zeeddd Zeeddd Zeeddd Zeeddd Zeeddd Zeeddd  Zeedd!d" Zeedd#d$ Zeedd%d& Zd'S )(WebSocketAppTestc                   @   s   e Zd ZdZdS )zWebSocketAppTest.NotSetYetzI A marker class for signalling that a value hasn't been set yet.
        N)__name__
__module____qualname____doc__ r   r   S/var/www/edux/Edux_v2/venv/lib/python3.10/site-packages/websocket/tests/test_app.py	NotSetYet'   s    r   c                 C   s6   t t t t_t t_t t_t t_d S N)	wsenableTrace	TRACEABLEr   r   keep_running_openkeep_running_closeget_mask_key_idon_error_dataselfr   r   r   setUp+   s
   



zWebSocketAppTest.setUpc                 C   s,   t  t _t  t _t  t _t  t _d S r   )r   r   r   r   r   r   r   r   r   r   tearDown3   s   


zWebSocketAppTest.tearDownz/Tests using local websocket server are disabledc                    s>   dd } fdd}dd }t jdt |||d}|  d	S )
| A WebSocketApp should keep running as long as its self.keep_running
        is not False (in the boolean context).
        c                 _   s   |  d | jt_d| _dS )zn Set the keep_running flag for later inspection and immediately
            close the connection.
            hello!FN)sendkeep_runningr   r   r   argskwargsr   r   r   on_open?   s   

z1WebSocketAppTest.testKeepRunning.<locals>.on_openc                       t |    d S r   printclosewsappmessager   r   r   
on_messageG      z4WebSocketAppTest.testKeepRunning.<locals>.on_messagec                 _   s   | j t_dS )z< Set the keep_running flag for the test to use.
            N)r   r   r   r   r   r   r   on_closeK   s   z2WebSocketAppTest.testKeepRunning.<locals>.on_closews://127.0.0.1:)r!   r+   r)   Nr   WebSocketAppr   run_forever)r   r!   r)   r+   appr   r   r   testKeepRunning9   s
   z WebSocketAppTest.testKeepRunningFz$Test disabled for now (requires rel)c                    s8   dd } fdd}t jdt ||d}|jdd d	S )
r   c                 _   s    |  d |   |  d dS )z8 Send a message, receive, and send one more
            r   zgoodbye!N)r   recvr   r   r   r   r!   Z   s   
z:WebSocketAppTest.testRunForeverDispatcher.<locals>.on_openc                    r"   r   r#   r&   r   r   r   r)   a   r*   z=WebSocketAppTest.testRunForeverDispatcher.<locals>.on_messager,   )r!   r)   
Dispatcher)
dispatcherNr-   )r   r!   r)   r0   r   r   r   testRunForeverDispatcherT   s   z)WebSocketAppTest.testRunForeverDispatcherc                 C   s:   t dt }tjd|jd  | }| |d dS )zk The WebSocketApp.run_forever() method should return `False` when the application ends gracefully.
        r,   皙?intervalfunctionFN)	r   r.   r   	threadingTimerr%   startr/   assertEqual)r   r0   teardownr   r   r   testRunForeverTeardownCleanExitj   s   z0WebSocketAppTest.testRunForeverTeardownCleanExitc                    sh    fdd}dd }t jdt |d tjd|d   jd	d
}| |d | t	t
jdk dS )z The WebSocketApp.run_forever() method should return `True` when the application ends with an exception.
        It should also invoke the `on_error` callback before exiting.
        c                      s    j   d S r   )sockr%   r   r0   r   r   break_ity   s   zHWebSocketAppTest.testRunForeverTeardownExceptionalExit.<locals>.break_itc                 S   s   t |t_d S r   )strr   r   )_errr   r   r   on_error}   s   zHWebSocketAppTest.testRunForeverTeardownExceptionalExit.<locals>.on_errorr,   )rF   r6   r7   g?)ping_timeoutTr   N)r   r.   r   r:   r;   r<   r/   r=   
assertTruelenr   r   )r   rB   rF   r>   r   rA   r   %testRunForeverTeardownExceptionalExits   s   z6WebSocketAppTest.testRunForeverTeardownExceptionalExitz%Internet-requiring tests are disabledc                 C   s0   dd }t jd|d}| t|jt| dS )zi A WebSocketApp should forward the received mask_key function down
        to the actual socket.
        c                   S   s   dS )Nz    r   r   r   r   r   my_mask_key_func   s   z:WebSocketAppTest.testSockMaskKey.<locals>.my_mask_key_funcwss://api-pub.bitfinex.com/ws/1)get_mask_keyN)r   r.   r=   idrM   )r   rK   r0   r   r   r   testSockMaskKey   s   z WebSocketAppTest.testSockMaskKeyc                 C   sB   dd }dd }t jd||d}| jt j|jddd	tjid
 dS )zA Test exception handling if ping_interval < ping_timeout
        c                 S      t d |   d S NzGot a ping!r#   r0   msgr   r   r   on_ping   r*   zDWebSocketAppTest.testInvalidPingIntervalPingTimeout.<locals>.on_pingc                 S   rP   NzGot a pong! No need to respondr#   rR   r   r   r   on_pong   r*   zDWebSocketAppTest.testInvalidPingIntervalPingTimeout.<locals>.on_pongrL   rT   rV         	cert_reqsping_intervalrG   ssloptNr   r.   assertRaisesWebSocketExceptionr/   ssl	CERT_NONEr   rT   rV   r0   r   r   r   "testInvalidPingIntervalPingTimeout   s   "z3WebSocketAppTest.testInvalidPingIntervalPingTimeoutc                 C   s:   dd }dd }t jd||d}|jddd	tjid
 dS )z5 Test WebSocketApp proper ping functionality
        c                 S   rP   rQ   r#   rR   r   r   r   rT      r*   z2WebSocketAppTest.testPingInterval.<locals>.on_pingc                 S   rP   rU   r#   rR   r   r   r   rV      r*   z2WebSocketAppTest.testPingInterval.<locals>.on_pongrL   rW   rY   rX   rZ   r[   N)r   r.   r/   ra   rb   rc   r   r   r   testPingInterval   s   z!WebSocketAppTest.testPingIntervalc                 C   s   t d}|jdddd dS )z( Test WebSocketApp close opcode
        'wss://tsock.us1.twilio.com/v3/wsconnectrY   rX   zPing payload)r\   rG   ping_payloadN)r   r.   r/   r   r0   r   r   r   testOpcodeClose   s   
z WebSocketAppTest.testOpcodeClosec                 C   *   t d}| jt j|jddtjid dS )z; A WebSocketApp handling of negative ping_interval
        rL   rZ   )r\   r]   Nr^   rh   r   r   r   testBadPingInterval      
 z$WebSocketAppTest.testBadPingIntervalc                 C   rj   )z: A WebSocketApp handling of negative ping_timeout
        rL   rZ   )rG   r]   Nr^   rh   r   r   r   testBadPingTimeout   rm   z#WebSocketAppTest.testBadPingTimeoutc                 C   s   dd }t jd|d}t jt jjdd}| ddg|| t jt jjd	d}| d
d
g|| t d}t jt jjd	d}| d
d
g|| | jt j|jdd d
S )zU Test extraction of close frame status code and close reason in WebSocketApp
        c                 S   s   t d d S )Nzon_close reached)r$   )r'   close_status_code	close_msgr   r   r   r+         z6WebSocketAppTest.testCloseStatusCode.<locals>.on_closerf   )r+   s   no-init-from-client)opcodedatai  zno-init-from-client    Nztest if connection is closed)rt   )	r   r.   ABNFOPCODE_CLOSEr=   _get_close_argsr_   "WebSocketConnectionClosedExceptionr   )r   r+   r0   
closeframeapp2r   r   r   testCloseStatusCode   s   
z$WebSocketAppTest.testCloseStatusCodec                    sv   d ddd } fdd}dd }t jdt |||d	}|jd
dd | | |  t | t d dS )z+ Test callback function exception handling Nc                 S      t dNCallback failedRuntimeErrorrA   r   r   r   r!         z?WebSocketAppTest.testCallbackFunctionException.<locals>.on_openc                    s   | | d S r   r   r0   rE   exc
passed_appr   r   rF      s   z@WebSocketAppTest.testCallbackFunctionException.<locals>.on_errorc                 S   s   |    d S r   r%   rR   r   r   r   rV      rr   z?WebSocketAppTest.testCallbackFunctionException.<locals>.on_pongr,   r!   rF   rV   rY   rX   r\   rG   r   )r   r.   r   r/   r=   assertIsInstancer   rC   )r   r!   rF   rV   r0   r   r   r   testCallbackFunctionException   s   z.WebSocketAppTest.testCallbackFunctionExceptionc                 C   sH   G dd d}| }|  |j|j | |jt |  t|jd dS )z) Test callback method exception handling c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
z?WebSocketAppTest.testCallbackMethodException.<locals>.Callbacksc                 S   s>   d | _ d | _tjdt | j| j| jd| _| jj	ddd d S )Nr,   r   rY   rX   r   )
r   r   r   r.   r   r!   rF   rV   r0   r/   r   r   r   r   __init__  s   zHWebSocketAppTest.testCallbackMethodException.<locals>.Callbacks.__init__c                 S   r}   r~   r   rh   r   r   r   r!     r   zGWebSocketAppTest.testCallbackMethodException.<locals>.Callbacks.on_openc                 S   s   || _ || _d S r   )r   r   )r   r0   rE   r   r   r   rF     s   
zHWebSocketAppTest.testCallbackMethodException.<locals>.Callbacks.on_errorc                 S   s   |   d S r   r   )r   r0   rS   r   r   r   rV     rr   zGWebSocketAppTest.testCallbackMethodException.<locals>.Callbacks.on_pongN)r   r   r	   r   r!   rF   rV   r   r   r   r   	Callbacks  s
    r   r   N)r=   r   r0   r   r   r   rC   )r   r   	callbacksr   r   r   testCallbackMethodException  s
   z,WebSocketAppTest.testCallbackMethodExceptionc                    sp   dd  fdd}fdd}t jdt ||d}|jd	d
dd | d	 |  t | t d dS )z Test reconnect r   Nc                    s   | d S r   r   r   )r   r   r   rF   '  s   z0WebSocketAppTest.testReconnect.<locals>.on_errorc                    s2    d7   dkr| j    dkr|   d S d S )NrX   rY   )r@   shutdownr%   rR   )
pong_countr   r   rV   +  s   
z/WebSocketAppTest.testReconnect.<locals>.on_pongr,   )rV   rF   rY   rX      )r\   rG   	reconnectzInvalid file object: None)r   r.   r   r/   r=   r   
ValueErrorrC   )r   rF   rV   r0   r   )r   r   r   testReconnect!  s   
zWebSocketAppTest.testReconnectN)r   r   r	   r   r   r   unittest
skipUnlessTEST_WITH_LOCAL_SERVERr1   r5   r?   rJ   r   rO   rd   re   ri   rl   ro   r|   r   r   r   r   r   r   r   r   %   s@    


























r   __main__)osos.pathr:   	websocketr   ra   r   environgetr   r   r   r   TestCaser   r   mainr   r   r   r   <module>   s"     