多线程获取google不同国家的账户消耗

Quota exceeded for quota metric 'Get requests' and limit 'Get requests per minute' of service 'googleads.googleapis.com' for consumer 'project_number:'

# coding: utf-8

from __future__ import absolute_import
import os
import sys
from functools import partial
import argparse

import datetime
import time
import logging
import signal
import threading

#from gevent.pool import Pool
#from gevent.pool import Group
#from gevent import Greenlet
#import gevent

# gevent monkey patch -- grpc 不能用
# async 也不行

# 使用多綫程

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

import google.ads.google_ads.client
import google.oauth2.credentials


# logging.getLogger('google.ads.google_ads.client').setLevel(logging.INFO)


def wrapper(retry_time, func):
    @wraps(func)
    def inner(*args, **kw):
        nonlocal retry_time
        while retry_time > 0:
            try:
                ret = func(*args, **kw)
                return ret
            except ResourceExhausted as e:
                print(f" 請求資源過多異常 with {e.msg}")
                time.sleep(10)
                retry_time -= 1
            except Exception as e :
                print(f"retried with 其他异常: {str(e)}")
                retry_time -= 1
        return func(*args, **kw)

    return inner

def retry(retry_time):
    return partial(wrapper, retry_time)

TIME_FORMAT = "%Y-%m-%d"

class GoogleThreadHandler():

    def __init__(self, user_credentials):
        self.credentials = user_credentials["credentials"]
        self.user_name = user_credentials["user_name"]
        self.customer_under_user = []
        self.customer_under_mcc = []
        self.error_customer_under_mcc = []
        self.account_cost_dict = dict()
        self.client = self._init()


    def _run(self):
        resource_names = self.get_customer_names()
        with ThreadPoolExecutor(max_workers=50) as executor:
            to_do = []
            for resource_name in resource_names:
                future = executor.submit(self.get_mcc_and_account_under_user, resource_name)
                to_do.append(future)

            mcc_under_user_list = []
            for future in as_completed(to_do):
                mcc = future.result()
                if mcc is not None:
                    mcc_under_user_list.append(mcc)

        # 1 获取user下面的 mcc 并把 user下的account放入 customer_under_user列表中

        with ThreadPoolExecutor(max_workers=50) as executor:
            executor.map(self.get_customer_under_mcc, mcc_under_user_list)
        # 2 依次把各个层级下的mcc 中的 普通 customer account 放入 customer_under_mcc 列表

        # 3 获取 account 的 消耗数据
        time.sleep(0.05)
        with ThreadPoolExecutor(max_workers=50) as executor:
            executor.map(
                self.get_customer_cost_last_day, self.customer_under_mcc
        )
        time.sleep(0.05)
        with ThreadPoolExecutor(max_workers=50) as executor:
            executor.map(
                self.get_customer_cost_last_day, self.error_customer_under_mcc
        )

        return self.account_cost_dict


        # 4 数据filter 后汇总, 发送邮件


    def _init(self, parent_mcc=None):
        credentials = google.oauth2.credentials.Credentials(**self.credentials)
        if parent_mcc is None:
            client = google.ads.google_ads.client.GoogleAdsClient(credentials, settings.GG_DEVELOP_TOKEN)
        else:
            client = google.ads.google_ads.client.GoogleAdsClient(credentials, settings.GG_DEVELOP_TOKEN, login_customer_id=parent_mcc)
        return client

    def get_customer_names(self):
        customer_service = self.client.get_service('CustomerService', version='v2')
        customers = customer_service.list_accessible_customers()
        resource_names = customers.resource_names
        return resource_names


    def get_mcc_and_account_under_user(self, resource_name):
        ga_service = self.client.get_service('GoogleAdsService', version='v2')
        customer_id = resource_name.split("/")[-1]
        query = """
                                 SELECT
                                    customer.manager,
                                    customer.test_account,
                                    customer.time_zone,
                                    customer.currency_code,
                                    customer.descriptive_name,
                                    customer.id
                               FROM
                                   customer
                               """

        try:
            customer_results = ga_service.search(customer_id, query=query)
            for c in customer_results:

                if c.customer.test_account.value:
                    return None

                if not c.customer.manager.value:
                    self.customer_under_user.append(str(c.customer.id.value))

                if  c.customer.manager.value:
                    return str(c.customer.id.value)
        except:
            return None

    def get_customer_cost_last_day(self, ga_service_customer_id):
        customer_id, ga_service, parent_mcc = ga_service_customer_id
        query_cost = """
                        SELECT
                             customer.descriptive_name,
                             metrics.cost_micros
                        FROM
                            customer
                        WHERE
                            segments.date BETWEEN '{}' AND '{}'
                        LIMIT
                            100

                    """.format(YESTERDAY, YESTERDAY)
        query_payments = """
                           SELECT
                            billing_setup.payments_account,
                            billing_setup.start_date_time,
                            billing_setup.payments_account_info.payments_account_name,
                            billing_setup.payments_account_info.payments_profile_name
                           FROM
                               billing_setup
                           WHERE 
                               billing_setup.status IN ('APPROVED', 'APPROVED_HELD')
                           ORDER BY
                               billing_setup.start_date_time DESC
                           """

        try:
            results_payments = ga_service.search(customer_id, query=query_payments)
            for p in results_payments:
                payments_profile_name = p.billing_setup.payments_account_info.payments_profile_name.value
                bill_start_time = p.billing_setup.start_date_time.value.split()[0]
                # 按时间逆序, 最新的bill是在使用的
                if compare_time(YESTERDAY, bill_start_time):
                    # 该天的消耗, 产生在这条bill后
                    if payments_profile_name != "BlueVision Interactive Limited":
                        logging.info(f"不算消耗, {payments_profile_name},cost_date: {YESTERDAY}, bill_start_time: {bill_start_time}, customer_id:  {customer_id}, MCC:  {parent_mcc}")
                        return

                    else:
                        logging.info(f"计算消耗, {payments_profile_name},cost_date: {YESTERDAY}, bill_start_time: {bill_start_time}, customer_id:  {customer_id}, MCC:  {parent_mcc}")
                        results = ga_service.search(customer_id, query=query_cost)
                        account_name = ""
                        cost = 0
                        account_id = google_format(customer_id)
                        for r in results:
                            account_name = r.customer.descriptive_name.value
                            try:
                                cost = r.metrics.cost_micros.value
                            except:
                                cost = ""
                            else:
                                if cost > 0:
                                    cost = format_cost_float(micros_to_currency(cost))

                        tuple_data = (
                            account_id,
                            account_name,
                            cost
                        )
                        self.account_cost_dict[account_id] = tuple_data
                        return
                else:
                    continue


        except Exception as e:
            self.error_customer_under_mcc.append((customer_id, ga_service, parent_mcc))
            logging.error(f"Error with, {YESTERDAY}, {customer_id}, {parent_mcc}, reason: {e}")
            ...

    def get_customer_under_mcc(self, mcc_id):
        if mcc_id:
            query = """
            SELECT
                 customer_client.id,
                 customer_client.descriptive_name,
                 customer_client.currency_code,
                 customer_client.time_zone,
                 customer_client.manager,
                 customer_client.level
            FROM
                customer_client
                """

            client = self._init(mcc_id)
            ga_service = client.get_service('GoogleAdsService', version='v2')

            # 嵌套 共用一个client , 防止每次递归都重新创建

            def inner(parent_mcc, mcc_id):
                mcc_list = []
                try:
                    results = ga_service.search(customer_id=mcc_id, query=query, retry=False)
                    for item in results:
                        customer_id = str(item.customer_client.id.value)
                        is_manager = item.customer_client.manager.value
                        level = item.customer_client.level.value

                        if is_manager == False:
                            self.customer_under_mcc.append((customer_id, ga_service, parent_mcc))

                        if level !=0 and is_manager == True:
                            mcc_list.append(customer_id)

                except Exception as e:
                    logging.info(f"获取customer失败, 原因: {e}")

                time.sleep(0.05)
                with ThreadPoolExecutor(max_workers=50) as e:
                    e.map(
                        partial(inner, parent_mcc),
                        mcc_list
                    )

            inner(mcc_id, mcc_id)
        else:
            ...

#class GoogleGeventHandler(Greenlet):
#
#    def __init__(self, user_credentials, pool, group):
#        Greenlet.__init__(self)
#        self.credentials = user_credentials["credentials"]
#        self.user_name = user_credentials["user_name"]
#        self.group = group
#        self.pool = pool
#        self.customer_under_user = []
#        self.customer_under_mcc = []
#        self.account_cost_set = set()
#        self.client = self._init()
#
#
#    def _run(self):
#        resource_names = self.get_customer_names()
#        # igroup = Group()
#        mcc_under_user_list = self.group.imap_unordered(self.get_mcc_and_account_under_user, resource_names)
#        # 1 获取user下面的 mcc 并把 user下的account放入 customer_under_user列表中
#
#        self.pool.map(self.get_customer_under_mcc, mcc_under_user_list)
#        # 2 依次把各个层级下的mcc 中的 普通 customer account 放入 customer_under_mcc 列表
#
#        # 3 获取 account 的 消耗数据
#        # self.pool_for_customer.map(
#        #     self.get_customer_cost_last_day, self.customer_under_mcc
#        # )
#        # self.group.map(
#        #     self.get_customer_cost_last_day, self.customer_under_mcc
#        # )
#
#        # 4 数据filter 后汇总, 发送邮件
#
#
#    def _init(self, parent_mcc=None):
#        credentials = google.oauth2.credentials.Credentials(**self.credentials)
#        if parent_mcc is None:
#            client = google.ads.google_ads.client.GoogleAdsClient(credentials, settings.GG_DEVELOP_TOKEN)
#        else:
#            client = google.ads.google_ads.client.GoogleAdsClient(credentials, settings.GG_DEVELOP_TOKEN, login_customer_id=parent_mcc)
#        return client
#
#    def get_customer_names(self):
#        customer_service = self.client.get_service('CustomerService', version='v2')
#        customers = customer_service.list_accessible_customers()
#        resource_names = customers.resource_names
#        return resource_names
#
#
#    def get_mcc_and_account_under_user(self, resource_name):
#        ga_service = self.client.get_service('GoogleAdsService', version='v2')
#        customer_id = resource_name.split("/")[-1]
#        query = """
#                                 SELECT
#                                    customer.manager,
#                                    customer.test_account,
#                                    customer.time_zone,
#                                    customer.currency_code,
#                                    customer.descriptive_name,
#                                    customer.id
#                               FROM
#                                   customer
#                               """
#
#        try:
#            customer_results = ga_service.search(customer_id, query=query)
#            for c in customer_results:
#
#                if c.customer.test_account.value:
#                    return None
#
#                if not c.customer.manager.value:
#                    self.customer_under_user.append(str(c.customer.id.value))
#
#                if  c.customer.manager.value:
#                    return str(c.customer.id.value)
#        except:
#            return None
#
#    def get_customer_cost_last_day(self, ga_service_customer_id):
#        customer_id, ga_service = ga_service_customer_id
#        query = """
#                        SELECT
#                             customer.descriptive_name,
#                             metrics.cost_micros
#                        FROM
#                            customer
#                        WHERE
#                            segments.date BETWEEN '{}' AND '{}'
#                        LIMIT
#                            100
#
#                    """.format(YESTERDAY, YESTERDAY)
#
#        try:
#            results = ga_service.search(customer_id, query=query)
#            # data = {
#            #     "account_id": google_format(customer_id),
#            #     "account_name":"",
#            #     "cost": 0,
#            #     "dt": YESTERDAY,
#            # }
#            #
#            account_name = ""
#            cost = 0
#            account_id = google_format(customer_id)
#            for r in results:
#                account_name = r.customer.descriptive_name.value
#                try:
#                    cost = r.metrics.cost_micros.value
#                except:
#                    cost = ""
#                else:
#                    if cost > 0:
#                        cost = micros_to_currency(cost)
#            tuple_data = (
#                account_id,
#                account_name,
#                cost
#            )
#            self.account_cost_list.append(tuple_data)
#            # logging.info(data)
#            # logging.info(len(self.account_cost_list))
#        except:
#            ...
#
#    def get_customer_under_mcc(self, mcc_id):
#        if mcc_id:
#            query = """
#            SELECT
#                 customer_client.id,
#                 customer_client.descriptive_name,
#                 customer_client.currency_code,
#                 customer_client.time_zone,
#                 customer_client.manager,
#                 customer_client.level
#            FROM
#                customer_client
#                """
#
#            client = self._init(mcc_id)
#            ga_service = client.get_service('GoogleAdsService', version='v2')
#
#            # 嵌套 共用一个client , 防止每次递归都重新创建
#
#            def inner(parent_mcc, mcc_id):
#                # logging.info(len(self.customer_under_mcc))
#                mcc_list = []
#                try:
#                    results = ga_service.search(customer_id=mcc_id, query=query, retry=False)
#                    for item in results:
#                        customer_id = str(item.customer_client.id.value)
#                        is_manager = item.customer_client.manager.value
#                        level = item.customer_client.level.value
#
#                        if is_manager == False:
#                            self.customer_under_mcc.append((customer_id, ga_service))
#                            # self.pool()
#                            self.pool.spawn(self.get_customer_cost_last_day ,(customer_id, ga_service))
#                            # logging.info(g.value)
#
#                        if level !=0 and is_manager == True:
#                            mcc_list.append(customer_id)
#
#                except Exception as e:
#                    logging.info(e)
#
#                # self.pool.map(
#                #     partial(self.get_customer_cost_last_day, ga_service),
#                #     customer_ids
#                # )
#
#                # self.pool_for_mcc.map(
#                #     partial(inner, parent_mcc),
#                #     mcc_list
#                # )
#                self.pool.map(
#                    partial(inner, parent_mcc),
#                    mcc_list
#                )
#
#            inner(mcc_id, mcc_id)
#        else:
#            ...


def micros_to_currency(micros):
    return micros / 1000000.0

def currency_to_micros(currency_money):
    return currency_money * 1000000


def google_format(n):
    # 1234567890 -> 123-456-7890
    n = str(n)
    return format(int(n[:-1]), ",").replace(",", "-") + n[-1]

def format_cost_float(cost):
    # cost 6位小数
    return round(cost + 0.00000001, 2)
    # @todo 17.955 -> 计算机的存储中是17.954999999999...


def compare_time(time_compare, time_to_compare):
    TIME_FORMAT = "%Y-%m-%d"

    # 判断 bill时间 是否 早于 cost的时间
    time1 = datetime.datetime.strptime(time_compare, TIME_FORMAT)
    time2 = datetime.datetime.strptime(time_to_compare, TIME_FORMAT)
    if time2 <= time1:
        return True
    return False



if __name__ == '__main__':
    # signal.signal(signal.SIGTERM, gevent.kill)
    BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    YESTERDAY = (datetime.datetime.today() - datetime.timedelta(days=1)).strftime(TIME_FORMAT)

    parser = argparse.ArgumentParser(description='google account cost')
    parser.add_argument("-p", "--produce_env", type=str,
                        help="环境, offline vs production",
                        default="offline")

    parser.add_argument('-d', '--execute_dir', type=str,
                    help='执行目录',
                    default=BASE_DIR)

    parser.add_argument('-date', '--cost_date', type=str,
                    help='获取消耗的日期, YYYY-MM-DD',
                    default=YESTERDAY)

    parser.add_argument('-task', '--task_type', type=str,
                    help='任务类型, email vs db',
                    default="email")

    args = parser.parse_args()
    sys.path.append(os.path.join(args.execute_dir, "conf"))
    sys.path.append(args.execute_dir)
    sys.path.append(BASE_DIR)
    sys.path.append(os.path.join(BASE_DIR, "db"))


    logConfFile = os.path.join(args.execute_dir, 'conf/logging.cfg')
    logging.config.fileConfig(logConfFile)
    os.environ['DJANGO_SETTINGS_MODULE'] = r'settings'

    from django.conf import settings
    from utils import CostExcelHandler

    env = args.produce_env
    if env.lower() == "offline":
        os.environ['https_proxy'] = 'xxx'
    else:
        ...

    YESTERDAY = args.cost_date

    start = time.perf_counter()
    tasks = [
          GoogleThreadHandler(user_conf) for user_conf in settings.USER_INFO_LIST
        ]
    final_dict = dict()
    for g in tasks:
        account_dict = g._run()
        final_dict.update(account_dict)


    handler = CostExcelHandler("google_cost", final_dict, YESTERDAY)
    task_type = args.task_type
    if task_type.lower() == "db":
        handler.save_cost_into_db()
    else:
        handler.send_email()
    logging.info(f"google_account cost with task: {task_type}, date: {YESTERDAY},"
                 f"env: {env}, exe_dir: {args.execute_dir}. finished after time spent: {(time.perf_counter() - start) / 60 } mins" )


query = """
SELECT
     customer.descriptive_name,
     customer.id,
     metrics.cost_micros,
      geographic_view.country_criterion_id
FROM
      geographic_view
WHERE
     segments.date BETWEEN '{}' AND '{}'
AND 
     metrics.cost_micros > 0
""".format("2020-06-01","2020-06-17")


   

Buy me a 肥仔水!