#!/usr/bin/python #Author: Andrew Nystrom (awnystrom@gmail.com) #This file is part of Alexa Scraper. #Alexa Scraper is free software: you can redistribute it and/or modify #it under the terms of the GNU General Public License as published by #the Free Software Foundation, either version 3 of the License, or #(at your option) any later version. #This program is distributed in the hope that it will be useful, #but WITHOUT ANY WARRANTY; without even the implied warranty of #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #GNU General Public License for more details. #You should have received a copy of the GNU General Public License #along with this program. If not, see . import threading from socket import * import re import time import select import zlib import sys import protocol import Listen myHost = '192.168.1.64'#gethostbyname("querier.saves-the-whales.com")#gethostname() print myHost myPort = 2007 MAX_QUEUED_CONNEXIONS = 10 import psycopg2 """ DB Schema CREATE TABLE google_hits( ngram varchar NOT NULL, seed_word varchar NOT NULL, neighborhood int NOT NULL, hits bigint DEFAULT -1, in_field boolean NOT NULL DEFAULT false, ts timestamp without time zone NOT NULL DEFAULT now(), PRIMARY KEY (ngram, seed_word, neighborhood) ); CREATE INDEX in_field_ts ON google_hits (in_field, ts); """ #
About 69,300,000 results #
Page 100 of about 983,000,000 results results_regex = re.compile('
.*?([\d,]+) results') other_results_regex = re.compile('
Page \d+ of (?:about )?([\d,]+) results
') def extract_hits_from_page(page): results = results_regex.findall(page) if len(results) == 0: results = other_results_regex.findall(page) if len(results) == 0: if 'did not match any documents' in page or 'No results found for' in page: return 0 else: print 'WARNING: RETURNING NONE FOR HITS!!!' return None hits = int( ''.join(re.findall('\d',results[0])) ) return hits def make_query_url(ngram, seed_word, neighborhood): #ngram = ''.join(map(lambda c: '%' + hex(ord(c)).split('x')[-1], ngram)) if len(ngram)*len(seed_word) != 0: assert neighborhood > 0 q = '"%s" AROUND(%s) "%s"' % (ngram, neighborhood, seed_word) q2 = ''.join(map(lambda c: '%' + hex(ord(c)).split('x')[-1], q)) url = 'http://www.google.com./search?gcx=w&sourceid=chrome&client=ubuntu&channel=cs&ie=UTF-8&q=%s&filter=0&num=10&start=990' % (q2) else: q = '"' + ngram + seed_word + '"' q2 = ''.join(map(lambda c: '%' + hex(ord(c)).split('x')[-1], q)) url = 'http://www.google.com./search?gcx=w&sourceid=chrome&client=ubuntu&channel=cs&ie=UTF-8&q=%s&filter=0&num=10&start=990' % (q2) url = re.sub(' {2,}', ' ', url) url = url.replace(' ', '+') return url, q class LockedSet: def __init__(self): self.elems = [] self.lock = threading.Lock() def has(self, elem): self.lock.acquire() result = elem in self.elems self.lock.release() return result def add(self, elem): self.lock.acquire() if elem in self.elems: self.lock.release() return self.elems.append(elem) self.lock.release() def remove(self, elem): self.lock.acquire() if elem not in self.elems: self.lock.release() return self.elems.remove(elem) self.lock.release() class MessageLogger: def __init__(self, fd): self.fd = fd self.lock = threading.Lock() def log_and_display(self, message): t = time.ctime() text = t + ': ' + message self.lock.acquire() print text #file_msg = '\n' + text #self.fd.write(file_msg) del t del text #del file_msg self.lock.release() class DatabaseInteractor: def __init__(self): self.conn = psycopg2.connect("dbname='3query_hits_db'") self.db_lock = threading.Lock() self.cur = self.conn.cursor() def get_next_query(self): self.db_lock.acquire() ngram = '' seed_word = '' neighborhood = '' try: #Choose the query that was updated longest ago that isn't in the field. If all are in the field, return the one that was updated longest ago. self.cur.execute("SELECT ngram, seed_word, neighborhood, hits, ts FROM google_hits ORDER BY in_field ASC, ts ASC LIMIT 1;") #self.cur.execute("SELECT ngram, seed_word, neighborhood, hits, ts FROM google_hits WHERE ngram ~ E'^evolu\.{2}o$' LIMIT 1;") row = self.cur.fetchone() if row != None: ngram = row[0] seed_word = row[1] neighborhood = int(row[2]) hits = int(row[3]) ts = row[4] self.cur.execute("UPDATE google_hits SET in_field='t' WHERE ngram=E'%s' AND seed_word='%s' AND neighborhood=%s;" % (ngram.replace("'", "\\'"), seed_word, neighborhood) ) return_result = ngram, seed_word, neighborhood, ts self.conn.commit() del hits del ts else: return_result = 'none', 'none', 0, 0 del self.cur self.cur = self.conn.cursor() del row del ngram del seed_word del neighborhood self.db_lock.release() return return_result except: raise try: self.db_lock.release() #self.cancel_query(ngram, seed_word, neighborhood) #print pass except: pass return None def update_hits(self, ngram, seed_word, neighborhood, hits): #self.db_lock.acquire() # start = time.time() self.cur.execute("UPDATE google_hits SET hits=%s, in_field='f', ts=now() WHERE ngram=E'%s' AND seed_word='%s' AND neighborhood=%s;" % (hits, ngram.replace("'", "\\'"), seed_word, neighborhood) ) self.conn.commit() del ngram del seed_word del neighborhood del hits # end = time.time() # print 'Updating hits tool %s seconds.' % (end-start) def cancel_query(self, ngram, seed_word, neighborhood, ts): self.cur.execute("UPDATE google_hits SET in_field='f', ts='%s'::timestamp without time zone WHERE ngram=E'%s' AND seed_word='%s' AND neighborhood=%s;" % (ts, ngram.replace("'", "\\'"), seed_word, neighborhood) ) self.conn.commit() def close(self): #self.cur.execute("UPDATE google_hits SET in_field=false;") self.cur.close() self.conn.commit() self.conn.close() #put the headers in too, and encrypt everything def recv_all(s): msg = '' while True: data = s.recv(4096) if data != '': msg += data else: break return msg def decrypt_message(message): chunks = [] i = 0 while True: next_chunk = message[i:i+CHUNK_SIZE] if next_chunk == '': break chunks.append(next_chunk) i += CHUNK_SIZE decrypt_chunks = map(lambda chunk: rsa.pkcs1.decrypt(chunk, PRIVATE_KEY), chunks) return ''.join(decrypt_chunks) class SimpleServeClient(threading.Thread): def __init__(self, conn, addr, db_machine, logger, listener, ip_set): threading.Thread.__init__(self) self.conn = conn self.addr = addr self.conn.settimeout(30) self.logger = logger self.db_machine = db_machine self.listener = listener self.ip_set = ip_set def run(self): self.logger.log_and_display('Serving a client %s in %s' % (self.addr, self.getName())) while self.listener.is_alive(): try: #Get the query URL for the client result = self.db_machine.get_next_query() if result == None: continue ngram, seed_word, neighborhood, ts = result del result if not self.listener.is_alive(): break query_url, q = make_query_url(ngram, seed_word, neighborhood) message = protocol.package_for_protocol(query_url) if not self.listener.is_alive(): break self.conn.sendall(message) del message if not self.listener.is_alive(): break #Get response from client (which should be a web page). message = '\1' start_time = time.time() while True: if not self.listener.is_alive(): break data = self.conn.recv(4096) message += data if protocol.protocol_condition(message[1:]) or time.time() - start_time > 20: break #if time.time() - start_time > 25: # print '\nLeaving early. Bad things may happen!\n' # break del data if not protocol.protocol_condition(message[1:]): #NEW: this may mean the client disconnected del ngram del seed_word del neighborhood del message self.logger.log_and_display('WTF? ' + str(self.addr)) break if not self.listener.is_alive(): del ngram del seed_word del neighborhood del message break message = message[protocol.hash_len()+1:] message = zlib.decompress(message) message = eval(message) if message['query'] != query_url: logger.log_and_display('Wrong page sent back ' + str(self.addr) + query_url) continue del query_url page = message['page'] del message if not self.listener.is_alive(): del page break hits = extract_hits_from_page(page) if hits == None: break open('./pages/'+str(time.time()), 'w').write(page) if not self.listener.is_alive(): del page del hits break if hits == None: #self.db_machine.cancel_query(ngram,seed_word, neighborhood, ts) pass else: self.db_machine.update_hits(ngram, seed_word, neighborhood, hits) log_message = "Updated ngram='%s', seed_word='%s', neighborhood='%s', hits='%s'" % (ngram, seed_word, neighborhood, hits) self.logger.log_and_display(log_message + ' ' + str(self.addr)) del hits del ngram del seed_word del neighborhood except: self.logger.log_and_display('Crap ' + str(self.addr)) break self.logger.log_and_display( 'Connexion closed!' ) self.ip_set.remove(self.addr[0]) def __del__(self): try: self.conn.shutdown(SHUT_RDWR) except: pass try: self.conn.close() except: pass del self.conn del self.addr def recv_all(s): #s.settimeout(2) msg = s.recv(4096) while True: data = s.recv(4096) if data != '': msg += data else: break return msg def listen_for_clients(): s = socket(AF_INET, SOCK_STREAM) # create a TCP socket s.bind((myHost, myPort)) # bind it to the server port s.listen(MAX_QUEUED_CONNEXIONS) # allow MAX_CONNEXIONS simultaneous # pending connections db_machine = DatabaseInteractor() listener = Listen.Listen(end_message='Listener stopping...') listener.start() ip_set = LockedSet() fd = open('log.txt', 'a') logger = MessageLogger(fd) logger.log_and_display('Starting up server...') thread_list = [] s.settimeout(0.1) while listener.is_alive(): try: # wait for next client to connect conn, addr = s.accept() # connection is a new socket except: continue if listener.is_alive(): if not ip_set.has(addr[0]): client_server = SimpleServeClient(conn, addr, db_machine, logger, listener, ip_set) client_server.isdaemon = True ip_set.add(addr[0]) client_server.start() print 'About to close the listener socket.' s.close() logger.log_and_display('Shutting down...') #logger.log_and_display('Waiting for the kids to stop playing...') #map(lambda kid: kid.join(), thread_list) logger.log_and_display('Setting in_field to false for all queries...') db_machine.close() #fd.close() if __name__ == '__main__': try: listen_for_clients() except: raise