#!/usr/bin/python
#Author: Andrew Nystrom (awnystrom@gmail.com)
#This file is part of Alexa Scraper.
#Alexa Scraper is free software: you can redistribute it and/or modify
#it under the terms of the GNU General Public License as published by
#the Free Software Foundation, either version 3 of the License, or
#(at your option) any later version.
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU General Public License for more details.
#You should have received a copy of the GNU General Public License
#along with this program. If not, see .
import threading
from socket import *
import re
import time
import select
import zlib
import sys
import protocol
import Listen
myHost = '192.168.1.64'#gethostbyname("querier.saves-the-whales.com")#gethostname()
print myHost
myPort = 2007
MAX_QUEUED_CONNEXIONS = 10
import psycopg2
"""
DB Schema
CREATE TABLE google_hits(
ngram varchar NOT NULL,
seed_word varchar NOT NULL,
neighborhood int NOT NULL,
hits bigint DEFAULT -1,
in_field boolean NOT NULL DEFAULT false,
ts timestamp without time zone NOT NULL DEFAULT now(),
PRIMARY KEY (ngram, seed_word, neighborhood)
);
CREATE INDEX in_field_ts ON google_hits (in_field, ts);
"""
#
About 69,300,000 results
#Page 100 of about 983,000,000 results
results_regex = re.compile('.*?([\d,]+) results
')
other_results_regex = re.compile('Page \d+ of (?:about )?([\d,]+) results
')
def extract_hits_from_page(page):
results = results_regex.findall(page)
if len(results) == 0:
results = other_results_regex.findall(page)
if len(results) == 0:
if 'did not match any documents' in page or 'No results found for' in page:
return 0
else:
print 'WARNING: RETURNING NONE FOR HITS!!!'
return None
hits = int( ''.join(re.findall('\d',results[0])) )
return hits
def make_query_url(ngram, seed_word, neighborhood):
#ngram = ''.join(map(lambda c: '%' + hex(ord(c)).split('x')[-1], ngram))
if len(ngram)*len(seed_word) != 0:
assert neighborhood > 0
q = '"%s" AROUND(%s) "%s"' % (ngram, neighborhood, seed_word)
q2 = ''.join(map(lambda c: '%' + hex(ord(c)).split('x')[-1], q))
url = 'http://www.google.com./search?gcx=w&sourceid=chrome&client=ubuntu&channel=cs&ie=UTF-8&q=%s&filter=0&num=10&start=990' % (q2)
else:
q = '"' + ngram + seed_word + '"'
q2 = ''.join(map(lambda c: '%' + hex(ord(c)).split('x')[-1], q))
url = 'http://www.google.com./search?gcx=w&sourceid=chrome&client=ubuntu&channel=cs&ie=UTF-8&q=%s&filter=0&num=10&start=990' % (q2)
url = re.sub(' {2,}', ' ', url)
url = url.replace(' ', '+')
return url, q
class LockedSet:
def __init__(self):
self.elems = []
self.lock = threading.Lock()
def has(self, elem):
self.lock.acquire()
result = elem in self.elems
self.lock.release()
return result
def add(self, elem):
self.lock.acquire()
if elem in self.elems:
self.lock.release()
return
self.elems.append(elem)
self.lock.release()
def remove(self, elem):
self.lock.acquire()
if elem not in self.elems:
self.lock.release()
return
self.elems.remove(elem)
self.lock.release()
class MessageLogger:
def __init__(self, fd):
self.fd = fd
self.lock = threading.Lock()
def log_and_display(self, message):
t = time.ctime()
text = t + ': ' + message
self.lock.acquire()
print text
#file_msg = '\n' + text
#self.fd.write(file_msg)
del t
del text
#del file_msg
self.lock.release()
class DatabaseInteractor:
def __init__(self):
self.conn = psycopg2.connect("dbname='3query_hits_db'")
self.db_lock = threading.Lock()
self.cur = self.conn.cursor()
def get_next_query(self):
self.db_lock.acquire()
ngram = ''
seed_word = ''
neighborhood = ''
try:
#Choose the query that was updated longest ago that isn't in the field. If all are in the field, return the one that was updated longest ago.
self.cur.execute("SELECT ngram, seed_word, neighborhood, hits, ts FROM google_hits ORDER BY in_field ASC, ts ASC LIMIT 1;")
#self.cur.execute("SELECT ngram, seed_word, neighborhood, hits, ts FROM google_hits WHERE ngram ~ E'^evolu\.{2}o$' LIMIT 1;")
row = self.cur.fetchone()
if row != None:
ngram = row[0]
seed_word = row[1]
neighborhood = int(row[2])
hits = int(row[3])
ts = row[4]
self.cur.execute("UPDATE google_hits SET in_field='t' WHERE ngram=E'%s' AND seed_word='%s' AND neighborhood=%s;" % (ngram.replace("'", "\\'"), seed_word, neighborhood) )
return_result = ngram, seed_word, neighborhood, ts
self.conn.commit()
del hits
del ts
else:
return_result = 'none', 'none', 0, 0
del self.cur
self.cur = self.conn.cursor()
del row
del ngram
del seed_word
del neighborhood
self.db_lock.release()
return return_result
except:
raise
try:
self.db_lock.release()
#self.cancel_query(ngram, seed_word, neighborhood)
#print
pass
except:
pass
return None
def update_hits(self, ngram, seed_word, neighborhood, hits):
#self.db_lock.acquire()
# start = time.time()
self.cur.execute("UPDATE google_hits SET hits=%s, in_field='f', ts=now() WHERE ngram=E'%s' AND seed_word='%s' AND neighborhood=%s;" % (hits, ngram.replace("'", "\\'"), seed_word, neighborhood) )
self.conn.commit()
del ngram
del seed_word
del neighborhood
del hits
# end = time.time()
# print 'Updating hits tool %s seconds.' % (end-start)
def cancel_query(self, ngram, seed_word, neighborhood, ts):
self.cur.execute("UPDATE google_hits SET in_field='f', ts='%s'::timestamp without time zone WHERE ngram=E'%s' AND seed_word='%s' AND neighborhood=%s;" % (ts, ngram.replace("'", "\\'"), seed_word, neighborhood) )
self.conn.commit()
def close(self):
#self.cur.execute("UPDATE google_hits SET in_field=false;")
self.cur.close()
self.conn.commit()
self.conn.close()
#put the headers in too, and encrypt everything
def recv_all(s):
msg = ''
while True:
data = s.recv(4096)
if data != '':
msg += data
else:
break
return msg
def decrypt_message(message):
chunks = []
i = 0
while True:
next_chunk = message[i:i+CHUNK_SIZE]
if next_chunk == '':
break
chunks.append(next_chunk)
i += CHUNK_SIZE
decrypt_chunks = map(lambda chunk: rsa.pkcs1.decrypt(chunk, PRIVATE_KEY), chunks)
return ''.join(decrypt_chunks)
class SimpleServeClient(threading.Thread):
def __init__(self, conn, addr, db_machine, logger, listener, ip_set):
threading.Thread.__init__(self)
self.conn = conn
self.addr = addr
self.conn.settimeout(30)
self.logger = logger
self.db_machine = db_machine
self.listener = listener
self.ip_set = ip_set
def run(self):
self.logger.log_and_display('Serving a client %s in %s' % (self.addr, self.getName()))
while self.listener.is_alive():
try:
#Get the query URL for the client
result = self.db_machine.get_next_query()
if result == None:
continue
ngram, seed_word, neighborhood, ts = result
del result
if not self.listener.is_alive(): break
query_url, q = make_query_url(ngram, seed_word, neighborhood)
message = protocol.package_for_protocol(query_url)
if not self.listener.is_alive(): break
self.conn.sendall(message)
del message
if not self.listener.is_alive(): break
#Get response from client (which should be a web page).
message = '\1'
start_time = time.time()
while True:
if not self.listener.is_alive(): break
data = self.conn.recv(4096)
message += data
if protocol.protocol_condition(message[1:]) or time.time() - start_time > 20:
break
#if time.time() - start_time > 25:
# print '\nLeaving early. Bad things may happen!\n'
# break
del data
if not protocol.protocol_condition(message[1:]): #NEW: this may mean the client disconnected
del ngram
del seed_word
del neighborhood
del message
self.logger.log_and_display('WTF? ' + str(self.addr))
break
if not self.listener.is_alive():
del ngram
del seed_word
del neighborhood
del message
break
message = message[protocol.hash_len()+1:]
message = zlib.decompress(message)
message = eval(message)
if message['query'] != query_url:
logger.log_and_display('Wrong page sent back ' + str(self.addr) + query_url)
continue
del query_url
page = message['page']
del message
if not self.listener.is_alive():
del page
break
hits = extract_hits_from_page(page)
if hits == None:
break
open('./pages/'+str(time.time()), 'w').write(page)
if not self.listener.is_alive():
del page
del hits
break
if hits == None:
#self.db_machine.cancel_query(ngram,seed_word, neighborhood, ts)
pass
else:
self.db_machine.update_hits(ngram, seed_word, neighborhood, hits)
log_message = "Updated ngram='%s', seed_word='%s', neighborhood='%s', hits='%s'" % (ngram, seed_word, neighborhood, hits)
self.logger.log_and_display(log_message + ' ' + str(self.addr))
del hits
del ngram
del seed_word
del neighborhood
except:
self.logger.log_and_display('Crap ' + str(self.addr))
break
self.logger.log_and_display( 'Connexion closed!' )
self.ip_set.remove(self.addr[0])
def __del__(self):
try:
self.conn.shutdown(SHUT_RDWR)
except:
pass
try:
self.conn.close()
except:
pass
del self.conn
del self.addr
def recv_all(s):
#s.settimeout(2)
msg = s.recv(4096)
while True:
data = s.recv(4096)
if data != '':
msg += data
else:
break
return msg
def listen_for_clients():
s = socket(AF_INET, SOCK_STREAM) # create a TCP socket
s.bind((myHost, myPort)) # bind it to the server port
s.listen(MAX_QUEUED_CONNEXIONS) # allow MAX_CONNEXIONS simultaneous
# pending connections
db_machine = DatabaseInteractor()
listener = Listen.Listen(end_message='Listener stopping...')
listener.start()
ip_set = LockedSet()
fd = open('log.txt', 'a')
logger = MessageLogger(fd)
logger.log_and_display('Starting up server...')
thread_list = []
s.settimeout(0.1)
while listener.is_alive():
try:
# wait for next client to connect
conn, addr = s.accept() # connection is a new socket
except:
continue
if listener.is_alive():
if not ip_set.has(addr[0]):
client_server = SimpleServeClient(conn, addr, db_machine, logger, listener, ip_set)
client_server.isdaemon = True
ip_set.add(addr[0])
client_server.start()
print 'About to close the listener socket.'
s.close()
logger.log_and_display('Shutting down...')
#logger.log_and_display('Waiting for the kids to stop playing...')
#map(lambda kid: kid.join(), thread_list)
logger.log_and_display('Setting in_field to false for all queries...')
db_machine.close()
#fd.close()
if __name__ == '__main__':
try:
listen_for_clients()
except:
raise