Message267884
hello,
I am using asyncio to handle 165 udp-packet per second
everything received by (datagram_received) will be put into a queue.
+++
124605 UDP packets send from client.
and received by server network:
dumpcap ( filter "port 59999 and len > 100" )
Packets: 124605
correct number send and received. 124605
++++++
received by application:
def datagram_received(self, data, addr):
[2016-06-08 14:59:49] total udp = 124255,queue size =0
[2016-06-08 14:59:49] Got 124255 json report from server.
only 124255 received by application.
124605 - 124255 = 350 udp , received by network card , but application never got it.
+++
code:
#########################################
class UDPProtocolServerTraffica:
def __init__(self):
self.transport = None
# heart beat timer
self.HEARTBEAT_TIMEOUT = 10.0
self.REPORTSHOWTOTAL_TIMEOUT=60.0
self.MSG_UDP_HEARTBEAT = b'\x01\x00\x00\x00\x11\x00\x00\x00\x01\x00\x00\x00\x00\x05\x00\x00\x00'
self.UDPCount = 0
def connection_made(self, transport):
self.transport = transport
# traffica startup message
self.transport.sendto(self.MSG_UDP_HEARTBEAT, (fns_remote_host, fns_remote_port))
# start 10 seconds timeout timer
self.h_timeout = asyncio.get_event_loop().call_later(self.HEARTBEAT_TIMEOUT, self.timeout_heartbeat)
# show total report
self.h_timeout = asyncio.get_event_loop().call_later(self.REPORTSHOWTOTAL_TIMEOUT, self.timeout_report_showtotal)
def datagram_received(self, data, addr):
#fns_mmdu_ipaddr = addr [0]
#fns_mmdu_port = addr [1]
Report_Id = (int.from_bytes(data[0:2], byteorder='big'))
if Report_Id != 327:
self.UDPCount += 1
# send message to queue
asyncio_queue.put_nowait(data)
def pause_reading(self):
print('pause_reading')
def resume_reading(self):
print('resume_reading')
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print('stop', exc)
def timeout_heartbeat(self):
self.transport.sendto(self.MSG_UDP_HEARTBEAT, (fns_remote_host, fns_remote_port))
self.h_timeout = asyncio.get_event_loop().call_later(self.HEARTBEAT_TIMEOUT, self.timeout_heartbeat)
#print('queue size ',asyncio_queue.qsize())
def timeout_report_showtotal(self):
self.h_timeout = asyncio.get_event_loop().call_later(self.REPORTSHOWTOTAL_TIMEOUT, self.timeout_report_showtotal)
self.displayReportTotalCount()
elasticsearch_get_all_reports()
def displayReportTotalCount(self):
logging.info('Total udp from fns: ' + str(self.UDPCount) + ' , queue size: ' + str(asyncio_queue.qsize()) )
regards!
Valdemar |
|
Date |
User |
Action |
Args |
2016-06-08 20:17:39 | valdemar.pavesi | set | recipients:
+ valdemar.pavesi, gvanrossum, vstinner, yselivanov |
2016-06-08 20:17:39 | valdemar.pavesi | set | messageid: <1465417059.51.0.620953300582.issue27271@psf.upfronthosting.co.za> |
2016-06-08 20:17:39 | valdemar.pavesi | link | issue27271 messages |
2016-06-08 20:17:38 | valdemar.pavesi | create | |
|