#!/usr/bin/env python3 import asyncio from collections import defaultdict import pickle import zmq.asyncio class _VarInfo(list): """ Stores observation data for a particular variable with key + s3 versions. Behaves like a list.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.key_version = self.s3_version_id = None class _StationInfo(defaultdict): """ Stores information about a given station, behaves like a dictionary. """ def __init__(self, lat: float, lon: float, elev_m: float): # initialize our parent class (defaultdict) to have a default `_VarInfo` key value: defaultdict(_VarInfo) super().__init__(_VarInfo) # { var_name: [(obs_epoch, obs_value), ...] self.lat = lat self.lon = lon self.elev_m = elev_m def add_var_obs(self, cleaned_obs_data: dict, obs_epoch: float): for var_name, obs_value in cleaned_obs_data.items(): self[var_name].append((obs_epoch, obs_value)) def _process_queued_obs(_input_lock, _queued_obs, _backlog): assert _input_lock.locked() try: while _queued_obs: # Process list in FIFO manner cleaned_data_list = _queued_obs.pop(0) for stn_tuple, lat, lon, elev_m, obs_epoch, cleaned_obs_data in cleaned_data_list: stn_info = _backlog.get(stn_tuple) if stn_info is None: stn_info = _backlog[stn_tuple] = _StationInfo(lat, lon, elev_m) else: # update lat, lon for a given s2id. Given that list_objects returns files from oldest to newest, we'll # continuously replace the lon/lat of a given StationTuple with the "latest" version stn_info.lat = lat stn_info.lon = lon stn_info.elev_m = elev_m # append new data stn_info.add_var_obs(cleaned_obs_data, obs_epoch) except: _success = False raise async def main(): _input_lock = asyncio.Lock() for x in range(500): with open('/tmp/q.pickle', 'rb') as f: queued_obs = pickle.loads(f.read()) backlog = dict() async with _input_lock: await _loop.run_in_executor(None, _process_queued_obs, _input_lock, queued_obs, backlog) assert len(backlog) > 30000 if __name__ == '__main__': # _loop = asyncio.get_event_loop() _loop = zmq.asyncio.ZMQEventLoop() asyncio.set_event_loop(_loop) _loop.run_until_complete(main())