
    iQ                         d Z ddlZddlZddlZddlmZ ddlZ ej        d          ZdZ	de
de
fdZ G d	 d
          ZdS )z
Async PostgreSQL writer for Gemini prediction market orderbook data.
Consumes from an asyncio.Queue so DB writes never bottleneck the stream.
    N)Anyz	gemini.dbz?postgresql://postgres:postgres@localhost:5432/gemini_orderbooksseriesreturnc                 \    t          j        dd|                                           }d| S )Nz[^a-zA-Z0-9]_
orderbook_)resublower)r   cleans     /var/www/html/gemini/db.py_sanitize_table_namer      s/    F?C006688E    c                   n    e Zd Zedfdedej        dz  fdZd Zd Z	defdZ
d	eeef         fd
Zd ZdS )OrderbookWriterNdsnqueuec                 |    || _         |pt          j                    | _        d | _        t                      | _        d S )N)r   asyncioQueuer   _poolset_known_tables)selfr   r   s      r   __init__zOrderbookWriter.__init__   s4    $)$<W]__
*.
'*uur   c                    K   t          j        | j        dd           d {V | _        t                              d           d S )N   
   )min_sizemax_sizezDB pool created)asyncpgcreate_poolr   r   loggerinfor   s    r   startzOrderbookWriter.start   sL      ".tx!bQQQQQQQQQ
%&&&&&r   c                    K   | j         r;| j                                          d {V  t                              d           d S d S )NzDB pool closed)r   closer#   r$   r%   s    r   stopzOrderbookWriter.stop"   sY      : 	**""$$$$$$$$$KK()))))	* 	*r   tablec                   K   || j         v rd S | j                                        4 d {V }|                    d| d           d {V  |                    d| d| d           d {V  |                    d| d| d           d {V  d d d           d {V  n# 1 d {V swxY w Y   | j                             |           t
                              d|           d S )	Nz,
                CREATE TABLE IF NOT EXISTS a   (
                    id              BIGSERIAL PRIMARY KEY,
                    ts              TIMESTAMPTZ NOT NULL DEFAULT now(),
                    instrument      TEXT NOT NULL,
                    event_ticker    TEXT NOT NULL,
                    contract_ticker TEXT NOT NULL,
                    market_interval TEXT,
                    best_bid        NUMERIC,
                    best_bid_size   NUMERIC,
                    best_ask        NUMERIC,
                    best_ask_size   NUMERIC
                );
            z0
                CREATE INDEX IF NOT EXISTS idx_z_ts ON z (ts);
            z_instrument ON z (instrument);
            zEnsured table %s)r   r   acquireexecuteaddr#   r$   )r   r*   conns      r   _ensure_tablezOrderbookWriter._ensure_table'   s     D&&&F:%%'' 	 	 	 	 	 	 	4,,  ,1               ,,  05   >C               ,,  05   FK              %	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	* 	u%%%&.....s   A$B##
B-0B-recordc                   K   t          |d                   }|                     |           d {V  | j                                        4 d {V }|                    d| d|d         |d         |d         |d         |                    d          |                    d	          rt          |d	                   nd |                    d
          rt          |d
                   nd |                    d          rt          |d                   nd |                    d          rt          |d                   nd 
  
         d {V  d d d           d {V  d S # 1 d {V swxY w Y   d S )Nr   z
                INSERT INTO z
                    (ts, instrument, event_ticker, contract_ticker,
                     market_interval, best_bid, best_bid_size,
                     best_ask, best_ask_size)
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
                ts
instrumentevent_tickercontract_tickermarket_intervalbest_bidbest_bid_sizebest_askbest_ask_size)r   r0   r   r,   r-   getfloat)r   r1   r*   r/   s       r   	write_onezOrderbookWriter.write_oneB   s$     $VH%566  ''''''''':%%'' 	 	 	 	 	 	 	4,,"   t|$~&()

,---3ZZ
-C-CMfZ()))28**_2M2MWf_-...SW-3ZZ
-C-CMfZ()))28**_2M2MWf_-...SW!        	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   C<E!!
E+.E+c                   K   t                               d           	 g }| j                                         d{V }|                    |           t          d          D ]E}	 |                    | j                                                   0# t          j        $ r Y  nw xY w|D ]\}	 | 	                    |           d{V  # t          $ r1 t                               d|                    d                     Y Yw xY w|D ]}| j                                         t          |          dk    r@t                               dt          |          | j                                                   ])	z2Drain the queue forever, writing rows to Postgres.zDB consumer loop startedTN   zFailed to write record: %sr4   2   z(Wrote batch of %d rows (queue depth: %d))r#   r$   r   r<   appendrange
get_nowaitr   
QueueEmptyr>   	Exception	exception	task_donelenqsize)r   batchitemr   recs        r   consumer_loopzOrderbookWriter.consumer_loopZ   s     .///	h "E))))))))DLL3ZZ  LL!6!6!8!89999)   EE  Z ZZ..----------  Z Z Z$$%A377<CXCXYYYYYZ  ' '
$$&&&&5zzBFE

TXT^TdTdTfTfggg+	hs$   &,BB&%B&.C

8DD)__name__
__module____qualname__DEFAULT_DSNstrr   r   r   r&   r)   r0   dictr   r>   rN    r   r   r   r      s        "-T - -C -gmd6J - - - -' ' '* * *
/ / / / /6d38n    0h h h h hr   r   )__doc__r   loggingr	   typingr   r!   	getLoggerr#   rR   rS   r   r   rU   r   r   <module>rZ      s    
   				       		;	'	'O           
[h [h [h [h [h [h [h [h [h [hr   