多线程批量请求facebook api


利用多线程队列(redis),批量请求


# encoding : utf-8
# __author__ = 'jm'

import logging
import threading
import datetime
import asyncio
import json
import time
import requests
from requests import exceptions
import os, sys
import django
import argparse
from django.db import connections
from utils import MyAdapter
import queue
import pymysql



BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

sys.path.append(BASE_DIR)
sys.path.append(os.path.join(BASE_DIR,'src'))
sys.path.append(os.path.join(BASE_DIR,'data_model'))



info_logger = logging.getLogger("pull_info")



class MysqlClient(object):

    def __init__(self):
        self.host = settings.MYSQL_HOST
        self.port = settings.MYSQL_PORT
        self.user = settings.MYSQL_USER,
        self.password = settings.MYSQL_PASSWORD
        self.database = settings.MYSQL_DB


    def __enter__(self):
        self.db = pymysql.connect(
            host = settings.MYSQL_HOST,
            port = settings.MYSQL_PORT,
            user = settings.MYSQL_USER,
            password = settings.MYSQL_PASSWORD,
            database = settings.MYSQL_DB,
        )
        self.cursor = self.db.cursor(pymysql.cursors.DictCursor)
        return self

    def select(self, sql, *params):
        self.cursor.execute(sql, params)
        return self.cursor.fetchall()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.db.close()





class Worker(threading.Thread):

    def __init__(self, task_queue, **kwargs):
        threading.Thread.__init__(self, **kwargs)
        self.task_queue = task_queue
        self.local_thread_ = threading.local()



    def _handle(self, account):
        if self.local_thread_.retry_dict.get(account, None):
            self.local_thread_.retry_dict[account]["time"] += 1


        account_obj = json.loads(account)
        account_ids = account_obj["accounts_ids"]
        access_token =account_obj["access_token"]

        request_list = []
        for account_id in account_ids:
            d = {
                "method": "GET",
                "relative_url": "v3.2/act_{}/campaigns?fields=name,objective,can_use_spend_cap,status,spend_cap,effective_status,promoted_object,daily_budget,lifetime_budget,start_time,end_time, account_id&limit=1000".format(
                    account_id)
            }
            request_list.append(d)

        params = {
            "access_token": access_token,
            "batch": json.dumps(request_list)
        }

        try:
            s = requests.Session()
            s.mount("https://", MyAdapter())
            with s.post("https://graph.facebook.com/", params=params) as response:
                return_data = json.loads(response.text)
                for item in return_data:
                    try:
                        base_data = json.loads(item["body"])
                        campaign_list = json.loads(item["body"])["data"]

                        try:
                            _ = base_data["paging"]['next']
                            next_campaign_list = self.get_next_page_data(base_data, [])
                            campaign_list.extend(next_campaign_list)
                        except:
                            pass

                        campaign_bulk = []
                        for campaign_data in campaign_list:
                            try:
                                campaign_obj = ...
                                campaign_list.append(campaign_obj)
                            except:
                                continue
                        XXXXXX.objects.bulk_create(campaign_bulk)


                    except Exception as e:
                        print(e)
                        pass




        except exceptions.ConnectionError as E:

            info_logger.info("Timeout: " + str(E))
            if self.local_thread_.retry_dict.get(account, None):
                retry_time = self.local_thread_.retry_dict[account]["time"]
                if retry_time > 3:
                    return

            else:
                self.local_thread_.retry_dict[account] = {"access_token": access_token,
                                                                          'time': 0}
            time.sleep(0.5)
            self._handle_account_campaign(account)

        except Exception as e:
            print(e)
            return


    def get_next_page_data(self, origin_data, return_data):
        try:
            next_page_url = origin_data['paging']['next']
            s = requests.Session()
            s.mount("https://", MyAdapter())
            with s.get(next_page_url) as response:
                temp_ad_list = json.loads(response.text)["data"]
                return_data.extend(temp_ad_list)
                return (self.get_next_page_data(json.loads(response.text), return_data))
        except KeyError:
            return return_data



    def run(self):

        while True:
            try:
                self.local_thread_.retry_dict = {}
                account = self.task_queue.get(False)
                self._handle_account_campaign(account)

            except queue.Empty:
                info_logger.info("campaign_queue is empty now!")
                break

            except Exception as e:
                info_logger.info(e)

            time.sleep(0.5)




class Service(object):

    def __init__(self):
        self.accounts_list = self._get_accounts_list()
        self.task_queue = queue.Queue()


    def _get_accounts_list(self):
        sql = "xxxxxxx"
        with MysqlClient() as mysql_handler:
            access_token_list = [item["access_token"] for item in mysql_handler.select(sql)]

        accounts_list = list(map(self._get_account_ids, access_token_list))
        return accounts_list


    def _get_account_ids(self, access_token):
        sql = "xxx"
        param = access_token
        with MysqlClient() as mysql_handler:
            account_ids = [item["account_id"] for item in mysql_handler.select(sql, param)]
        return  {"access_token": access_token, "accounts_ids": account_ids}



    def _convert_accounts_info(self):
        accounts_50_list = []
        for grouped_accounts in self.accounts_list:
            accounts_ids = grouped_accounts["accounts_ids"]
            access_token = grouped_accounts["access_token"]
            temp_id_list = [accounts_ids[i: i + 50] for i in range(0, len(accounts_ids), 50)]
            for temp_50 in temp_id_list:
                accounts_50_list.append(json.dumps({"access_token": access_token, "accounts_ids": temp_50}))

        return accounts_50_list

    def run(self):

        ad_accounts = self._convert_accounts_info()

        for account in ad_accounts:
            self.task_queue.put(account)

        account_working_list = []
        for _ in range(20):
            account_working_thread = Worker(self.task_queue)
            account_working_list.append(account_working_thread)

        for account_working_thread in account_working_list:
            account_working_thread.daemon = True
            account_working_thread.start()

        for account_working_thread in account_working_list:
            account_working_thread.join()





def pull_account_campaign():
    django.db.connections.close_all()
    Service().run()




if __name__ == '__main__':
    ap = argparse.ArgumentParser()
    ap.add_argument('-d', '--execute_dir', type=str,
                    help='执行目录',
                    default=BASE_DIR)
    args = ap.parse_args()
    sys.path.append(os.path.join(args.execute_dir, "conf"))
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_settings")
    django.setup()

    from data_model.models import  xxxx, xxxxx, xxxxx, xxxxx, xxxx
    from django.conf import settings

    info_logger = logging.getLogger("pull_info")
    pull_account_campaign()


Buy me a 肥仔水!