Squeak.ru - шаблоны программирования

Запустите прокси CloudSQL в потоке данных Python / Apache Beam

В настоящее время я работаю над заданием ETL Dataflow (используя Apache Beam Python SDK), которое запрашивает данные из CloudSQL (с psycopg2 и настраиваемым ParDo) и записывает их в BigQuery. Моя цель - создать шаблон потока данных, который я могу запустить из AppEngine, используя задание Cron.

У меня есть версия, которая работает локально с помощью DirectRunner. Для этого я использую прокси-клиент CloudSQL (Postgres), чтобы я мог подключиться к базе данных на 127.0.0.1.

При использовании DataflowRunner с пользовательскими командами для запуска прокси в сценарии setup.py задание не выполняется. Он застревает в повторении этого сообщения журнала:

Setting node annotation to enable volume controller attach/detach

Часть моего setup.py выглядит следующим образом:

CUSTOM_COMMANDS = [
['echo', 'Custom command worked!'],
['wget', 'https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64', '-O', 'cloud_sql_proxy'],
['echo', 'Proxy downloaded'],
['chmod', '+x', 'cloud_sql_proxy']]

class CustomCommands(setuptools.Command):
  """A setuptools Command class able to run arbitrary commands."""

  def initialize_options(self):
    pass

  def finalize_options(self):
    pass

  def RunCustomCommand(self, command_list):
    print('Running command: %s' % command_list)
    logging.info("Running custom commands")
    p = subprocess.Popen(
        command_list,
        stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    # Can use communicate(input='y\n'.encode()) if the command run requires
    # some confirmation.
    stdout_data, _ = p.communicate()
    print('Command output: %s' % stdout_data)
    if p.returncode != 0:
      raise RuntimeError(
          'Command %s failed: exit code: %s' % (command_list, p.returncode))

  def run(self):
    for command in CUSTOM_COMMANDS:
      self.RunCustomCommand(command)
    subprocess.Popen(['./cloud_sql_proxy', '-instances=bi-test-1:europe-west1:test-animal=tcp:5432'])

Я добавил последнюю строку как отдельную subprocess.Popen() в run() после прочтения этой проблемы на Github от sthomp и это обсуждение на Stackoverflo. Я также пробовал поиграться с некоторыми параметрами subprocess.Popen.

Другое упомянутое решение от brodin заключалось в том, чтобы разрешить доступ с любого IP-адреса и подключиться через имя пользователя и пароль. Насколько я понимаю, он не считает это наилучшей практикой.

Заранее благодарим вас за помощь.

!!! Обходное решение внизу этого сообщения !!!


Обновление - файлы журнала

Это журналы уровня ошибок, возникающих во время выполнения задания:

E  EXT4-fs (dm-0): couldn't mount as ext3 due to feature incompatibilities 
E  Image garbage collection failed once. Stats initialization may not have completed yet: unable to find data for container / 
E  Failed to check if disk space is available for the runtime: failed to get fs info for "runtime": unable to find data for container / 
E  Failed to check if disk space is available on the root partition: failed to get fs info for "root": unable to find data for container / 
E  [ContainerManager]: Fail to get rootfs information unable to find data for container / 
E  Could not find capacity information for resource storage.kubernetes.io/scratch 
E  debconf: delaying package configuration, since apt-utils is not installed 
E    % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current 
E                                   Dload  Upload   Total   Spent    Left  Speed 
E  
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100  3698  100  3698    0     0  25674      0 --:--:-- --:--:-- --:--:-- 25860 



#-- HERE IS WHEN setup.py FOR MY JOB IS EXECUTED ---

E  debconf: delaying package configuration, since apt-utils is not installed 
E  insserv: warning: current start runlevel(s) (empty) of script `stackdriver-extractor' overrides LSB defaults (2 3 4 5). 
E  insserv: warning: current stop runlevel(s) (0 1 2 3 4 5 6) of script `stackdriver-extractor' overrides LSB defaults (0 1 6). 
E  option = Interval; value = 60.000000; 
E  option = FQDNLookup; value = false; 
E  Created new plugin context. 
E  option = PIDFile; value = /var/run/stackdriver-agent.pid; 
E  option = Interval; value = 60.000000; 
E  option = FQDNLookup; value = false; 
E  Created new plugin context. 

Здесь вы можете найти все журналы после запуска моего настраиваемого файла setup.py (уровень журнала: любой; все журналы):

https://jpst.it/1gk2Z

Обновить файлы журналов 2

Журналы заданий (я вручную отменил задание после того, как некоторое время не зависал):

 2018-06-08 (08:02:20) Autoscaling is enabled for job 2018-06-07_23_02_20-5917188751755240698. The number of workers will b...
 2018-06-08 (08:02:20) Autoscaling was automatically enabled for job 2018-06-07_23_02_20-5917188751755240698.
 2018-06-08 (08:02:24) Checking required Cloud APIs are enabled.
 2018-06-08 (08:02:24) Checking permissions granted to controller Service Account.
 2018-06-08 (08:02:25) Worker configuration: n1-standard-1 in europe-west1-b.
 2018-06-08 (08:02:25) Expanding CoGroupByKey operations into optimizable parts.
 2018-06-08 (08:02:25) Combiner lifting skipped for step Save new watermarks/Write/WriteImpl/GroupByKey: GroupByKey not fol...
 2018-06-08 (08:02:25) Combiner lifting skipped for step Group watermarks: GroupByKey not followed by a combiner.
 2018-06-08 (08:02:25) Expanding GroupByKey operations into optimizable parts.
 2018-06-08 (08:02:26) Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
 2018-06-08 (08:02:26) Annotating graph with Autotuner information.
 2018-06-08 (08:02:26) Fusing adjacent ParDo, Read, Write, and Flatten operations
 2018-06-08 (08:02:26) Fusing consumer Get rows from CloudSQL tables into Begin pipeline with watermarks/Read
 2018-06-08 (08:02:26) Fusing consumer Group watermarks/Write into Group watermarks/Reify
 2018-06-08 (08:02:26) Fusing consumer Group watermarks/GroupByWindow into Group watermarks/Read
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/WriteBundles/WriteBundles into Save new watermar...
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/GroupByKey/GroupByWindow into Save new watermark...
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/GroupByKey/Reify into Save new watermarks/Write/...
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/GroupByKey/Write into Save new watermarks/Write/...
 2018-06-08 (08:02:26) Fusing consumer Write to BQ into Get rows from CloudSQL tables
 2018-06-08 (08:02:26) Fusing consumer Group watermarks/Reify into Write to BQ
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/Map(<lambda at iobase.py:926>) into Convert dict...
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/WindowInto(WindowIntoFn) into Save new watermark...
 2018-06-08 (08:02:26) Fusing consumer Convert dictionary list to single dictionary and json into Remove "watermark" label
 2018-06-08 (08:02:26) Fusing consumer Remove "watermark" label into Group watermarks/GroupByWindow
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/InitializeWrite into Save new watermarks/Write/W...
 2018-06-08 (08:02:26) Workflow config is missing a default resource spec.
 2018-06-08 (08:02:26) Adding StepResource setup and teardown to workflow graph.
 2018-06-08 (08:02:26) Adding workflow start and stop steps.
 2018-06-08 (08:02:26) Assigning stage ids.
 2018-06-08 (08:02:26) Executing wait step start25
 2018-06-08 (08:02:26) Executing operation Save new watermarks/Write/WriteImpl/DoOnce/Read+Save new watermarks/Write/WriteI...
 2018-06-08 (08:02:26) Executing operation Save new watermarks/Write/WriteImpl/GroupByKey/Create
 2018-06-08 (08:02:26) Starting worker pool setup.
 2018-06-08 (08:02:26) Executing operation Group watermarks/Create
 2018-06-08 (08:02:26) Starting 1 workers in europe-west1-b...
 2018-06-08 (08:02:27) Value "Group watermarks/Session" materialized.
 2018-06-08 (08:02:27) Value "Save new watermarks/Write/WriteImpl/GroupByKey/Session" materialized.
 2018-06-08 (08:02:27) Executing operation Begin pipeline with watermarks/Read+Get rows from CloudSQL tables+Write to BQ+Gr...
 2018-06-08 (08:02:36) Autoscaling: Raised the number of workers to 0 based on the rate of progress in the currently runnin...
 2018-06-08 (08:02:46) Autoscaling: Raised the number of workers to 1 based on the rate of progress in the currently runnin...
 2018-06-08 (08:03:05) Workers have started successfully.
 2018-06-08 (08:11:37) Cancel request is committed for workflow job: 2018-06-07_23_02_20-5917188751755240698.
 2018-06-08 (08:11:38) Cleaning up.
 2018-06-08 (08:11:38) Starting worker pool teardown.
 2018-06-08 (08:11:38) Stopping worker pool...
 2018-06-08 (08:12:30) Autoscaling: Reduced the number of workers to 0 based on the rate of progress in the currently runni...

Следы стека:

No errors have been received in this time period.

Обновление: временное решение можно найти в моем ответе ниже


  • Не могли бы вы предоставить нам полный журнал с указанием фактической ошибки? Потому что только по Setting node annotation to enable volume controller attach/detach мы мало что можем увидеть, что происходит и почему. 06.06.2018
  • @komarkovich спасибо за комментарий! Есть ли подходящий способ предоставить вам файлы журнала? Сам воркер пока не показывает никаких логов (может потому, что не запущен). Я не могу разместить здесь все логи системы, кубелета и т. Д., Потому что они слишком длинные. 07.06.2018
  • Мне нужно, чтобы вы предоставили мне журналы сбойного задания Dataflow. Вы можете найти их в журналах вакансий https://console.cloud.google.com/dataflow?jobsDetail/locations/<ZONE>/jobs/<JOB_ID>?project=<PROJECT_NAME>. Должны быть некоторые ошибки, которые должны рассказать нам о том, что происходит. Необязательно публиковать все журналы (только самые относительные). Если их слишком много, вы можете использовать инструмент [justPasteIt] (justpaste.it), чтобы поделиться ими здесь. 07.06.2018
  • Обновленный пост с лог-файлами (спасибо за подсказку с justpaste.it). Я скопировал журналы из Просмотрщика журналов. К сожалению, всегда попадал в список вакансий при использовании вашей ссылки выше с моими спецификациями. 08.06.2018
  • Спасибо за это, но я хотел не это. Пожалуйста, опубликуйте журналы потока данных. Извините за ссылку, эта должна быть правильной: https://console.cloud.google.com/dataflow/jobsDetail/locations/<ZONE>/jobs/<JOB_ID>?project=<PROJECT_NAME>. Найдите здесь журналы этого задания и предоставьте трассировку стека. 08.06.2018
  • Спасибо за ответ. Добавлены журналы заданий и трассировки стека (которые не показывают ошибок). Есть ли у вас какие-либо идеи? 08.06.2018
  • Присмотревшись к тому, чего вы пытаетесь достичь здесь, я сообщил, что в настоящее время это невозможно, потому что луч Apache [не предлагает соединителя] (beam.apache.org/documentation/io/built-in) для Python. Для обходного пути вы можете взглянуть на [this] (stackoverflow.com/questions/46528343/) И так. Однако я предлагаю решить эту проблему здесь 08.06.2018
  • Спасибо. Раньше моим обходным путем был специальный ParDo, где я использую psycopg2 с прокси CloudSQL. Но моя единственная проблема в том, что прокси не будет работать в Dataflow (хотя на моем локальном компьютере он работает). Вы не знаете, как заставить прокси CloudSQL работать в Dataflow? 08.06.2018
  • В настоящий момент невозможно использовать прокси-сервер Cloud SQL с Dataflow. Я бы пошел дальше, последовал совету @komarkovich и решил проблему на GitHub. А пока вы можете следовать его советам по обходному пути. 11.06.2018
  • Обновил мой пост с текущим обходным решением (с использованием подключения через IP с сертификатами SSL). Спасибо всем за помощь. Не стесняйтесь комментировать текущее обходное решение 12.06.2018
  • @ThomasSchmidt, поскольку вы нашли обходной путь для своей проблемы, не могли бы вы опубликовать его в качестве ответа и принять его, чтобы другие пользователи могли извлечь из этого пользу? Спасибо. 12.06.2018

Ответы:


1

Обходное решение:

Наконец-то я нашел обходной путь. Мне пришла в голову идея подключиться через публичный IP-адрес экземпляра CloudSQL. Для этого вам нужно было разрешить подключения к вашему экземпляру CloudSQL с каждого IP-адреса:

  1. Перейдите на страницу обзора вашего экземпляра CloudSQL в GCP.
  2. Перейдите на вкладку Authorization
  3. Нажмите Add network и добавьте 0.0.0.0/0 (!! это позволит каждому IP-адресу подключиться к вашему экземпляру !!)

Чтобы повысить безопасность процесса, я использовал ключи SSL и разрешил только SSL-соединения с экземпляром:

  1. Перейдите на вкладку SSL
  2. Нажмите Create a new certificate, чтобы создать сертификат SSL для вашего сервера.
  3. Нажмите Create a client certificate, чтобы создать сертификат SSL для вашего клиента.
  4. Нажмите Allow only SSL connections, чтобы отклонить все попытки подключения без SSL.

После этого я сохранил сертификаты в корзине Google Cloud Storage и загружаю их перед подключением в задании Dataflow, то есть:

import psycopg2
import psycopg2.extensions
import os
import stat
from google.cloud import storage

# Function to wait for open connection when processing parallel
def wait(conn):
    while 1:
        state = conn.poll()
        if state == psycopg2.extensions.POLL_OK:
            break
        elif state == psycopg2.extensions.POLL_WRITE:
            pass
            select.select([], [conn.fileno()], [])
        elif state == psycopg2.extensions.POLL_READ:
            pass
            select.select([conn.fileno()], [], [])
        else:
            raise psycopg2.OperationalError("poll() returned %s" % state)

# Function which returns a connection which can be used for queries
def connect_to_db(host, hostaddr, dbname, user, password, sslmode = 'verify-full'):

    # Get keys from GCS
    client = storage.Client()

    bucket = client.get_bucket(<YOUR_BUCKET_NAME>)

    bucket.get_blob('PATH_TO/server-ca.pem').download_to_filename('server-ca.pem')
    bucket.get_blob('PATH_TO/client-key.pem').download_to_filename('client-key.pem')
    os.chmod("client-key.pem", stat.S_IRWXU)
    bucket.get_blob('PATH_TO/client-cert.pem').download_to_filename('client-cert.pem')

    sslrootcert = 'server-ca.pem'
    sslkey = 'client-key.pem'
    sslcert = 'client-cert.pem'

    con = psycopg2.connect(
        host = host,
        hostaddr = hostaddr,
        dbname = dbname,
        user = user,
        password = password,
        sslmode=sslmode,
        sslrootcert = sslrootcert,
        sslcert = sslcert,
        sslkey = sslkey)
    return con

Затем я использую эти функции в пользовательском ParDo для выполнения запросов.
Минимальный пример:

import apache_beam as beam

class ReadSQLTableNames(beam.DoFn):
    '''
    parDo class to get all table names of a given cloudSQL database.
    It will return each table name.
    '''
    def __init__(self, host, hostaddr, dbname, username, password):
        super(ReadSQLTableNames, self).__init__()
        self.host = host
        self.hostaddr = hostaddr
        self.dbname = dbname
        self.username = username
        self.password = password

    def process(self, element):

        # Connect do database
        con = connect_to_db(host = self.host,
            hostaddr = self.hostaddr,
            dbname = self.dbname,
            user = self.username,
            password = self.password)
        # Wait for free connection
        wait_select(con)
        # Create cursor to query data
        cur = con.cursor(cursor_factory=RealDictCursor)

        # Get all table names
        cur.execute(
        """
        SELECT
        tablename as table
        FROM pg_tables
        WHERE schemaname = 'public'
        """
        )
        table_names = cur.fetchall()

        cur.close()
        con.close()
        for table_name in table_names:
            yield table_name["table"]

Тогда часть конвейера могла бы выглядеть так:

# Current workaround to query all tables: 
# Create a dummy initiator PCollection with one element
init = p        |'Begin pipeline with initiator' >> beam.Create(['All tables initializer'])

tables = init   |'Get table names' >> beam.ParDo(ReadSQLTableNames(
                                                host = known_args.host,
                                                hostaddr = known_args.hostaddr,
                                                dbname = known_args.db_name,
                                                username = known_args.user,
                                                password = known_args.password))

Я надеюсь, что это решение поможет другим справиться с подобными проблемами.

13.06.2018
  • Гарантирует ли этот метод, что шифрование GCS по умолчанию сохраняется при передаче, пока сертификаты загружаются в задание Dataflow? @komarkovich 28.11.2018
  • Значит, это невозможно сделать с файлом setup.py и конфигурацией прокси? 19.12.2018
  • @IoT пока не нашел решения для прокси. Я надеюсь, что в будущем будет хороший способ, потому что недавно у меня возникли некоторые проблемы с моей работой. Иногда загруженный файл пуст, и мне нужно было добавить несколько проверок и повторных попыток 19.12.2018
  • Спасибо @ThomasSchmidt. Я надеюсь, что Google будет работать усерднее, потому что находится слишком далеко от двух других основных облачных компаний. 19.12.2018

  • 2

    Мне удалось найти лучшее или, по крайней мере, более простое решение. В функции настройки DoFn используйте облачный прокси для настройки предварительного подключения

    class MyDoFn(beam.DoFn):
     def setup(self):
        os.system("wget https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64 -O cloud_sql_proxy")
        os.system("chmod +x cloud_sql_proxy")
        os.system(f"./cloud_sql_proxy -instances={self.sql_args['cloud_sql_connection_name']}=tcp:3306 &")
    
    01.05.2020
  • Задание выдает ошибку RuntimeError: mysql.connector.errors.InterfaceError: 2003: не удается подключиться к серверу MySQL на локальном хосте: 3306, хотя он может получить доступ к таблице. 05.08.2020
  • Я думаю, что для потока данных с частным IP-адресом людям может потребоваться добавить файл прокси в облачное хранилище. 06.11.2020
  • @sernle Cloud NAT позволит использовать вышеупомянутое решение с частным потоком данных IP, но если Cloud NAT не является вариантом, я согласен, что прокси-файл в облачном хранилище является разумным обходным путем 09.06.2021
  • Новые материалы

    Угловая структура архитектуры
    Обратите внимание, что эта статья устарела, я решил создать новую с лучшей структурой и с учетом автономных компонентов: https://medium.com/@marekpanti/angular-standalone-architecture-b645edd0d54a..

    «Данные, которые большинство людей используют для обучения своих моделей искусственного интеллекта, поставляются со встроенным…
    Первоначально опубликовано HalkTalks: https://hacktown.com.br/blog/blog/os-dados-que-a-maioria-das-pessoas-usa-para-treinar-seus-modelos-de-inteligencia-artificial- ja-vem-com-um-vies-embutido/..

    Сильный ИИ против слабого ИИ: различия парадигм искусственного интеллекта
    В последние годы изучению и развитию искусственного интеллекта (ИИ) уделяется большое внимание и прогресс. Сильный ИИ и Слабый ИИ — две основные парадигмы в области искусственного интеллекта...

    Правильный способ добавить Firebase в ваш проект React с помощью React Hooks
    React + Firebase - это мощная комбинация для быстрого и безопасного создания приложений, от проверки концепции до массового производства. Раньше (знаете, несколько месяцев назад) добавление..

    Создайте API с помощью Python FastAPI
    Создание API с помощью Python становится очень простым при использовании пакета FastAPI. После установки и импорта вы можете создать приложение FastAPI и указать несколько конечных точек. Каждой..

    Веселье с прокси-сервером JavaScript
    Прокси-серверы JavaScript — это чистый сахар, если вы хотите создать некоторую общую логику в своих приложениях, чтобы облегчить себе жизнь. Вот один пример: Связь клиент-сервер Мы..

    Получить бесплатный хостинг для разработчиков | Разместите свой сайт за несколько шагов 🔥
    Статические веб-сайты — это веб-страницы с фиксированным содержанием и его постоянным содержанием. Но теперь статические сайты также обрабатывают динамические данные с помощью API и запросов...