Mi experiencia en el mundo de Big Data - Parte II

David Emmanuel Reyes Núñez

Senior Data Engineer

En la entrega anterior (adjunto) creamos los scripts para enlistar y descargar archivos desde Google Drive hacia nuestro filesystem local.

En esta entrega continuaremos con el código de la función processDriveFiles.py y crearemos los scripts para hacer la carga de archivos hacia Google Cloud

La funcionalidad de este script es procesar los archivos listados en nuestro archivo parameters.csv, los cuales tengan el parámero Status con valor 1, recordemos que esto le indica a nuestro programa si el archivo se descargará y procesará o no.

A continuación, el código básico de esta función. Para nuestro ejemplo solo incluiremos archivos con extensión csv y separados por pipes “|”.

En pasos anteriores ya descargamos nuestro archivo al servidor local, el paso siguiente será ingestarlo en Big query y subir el archivo a nuestro proyecto de GCP.

El siguiente código se encarga de validar el archivo e ingestarlo hacia nuestro destino definido.

#Validamos que el tamaño del archivo sea mayor a 0 para poder cargarlo al destino definido en el archivo de configuración, en este caso nuestro destino es Google Cloud Storage y BigQuery, al cual le dimos el valor 1 en nuestro archivo.
file_size=os.stat(props['archivo_origen']).st_size

if (int(file_size)>0):
   if(int(props['Destino'])=1):        

#Tenemos las variables siguientes, sus valores son devueltos por la función upload_GCS_BQ:
#exit_codeBQ  - Bandera para indicar si la ingesta fue exitosa o no.
#registros    - Almacena el numero de registros del archivo.
#Timestamp_date – La fecha en que se hace la ingesta.
#strerror  - Si hay error en la ingesta, esta variable almacena el #código del error
                           exit_codeBQ,registros,Timestamp_Date,strerror=upload_GCS_BQ(creds,props,item['id'])
else:
    print('archivo vacio')

#Al final del proceso, eliminamos los archivos descargados a nuestro servidor, para liberar el espacio ocupado

file_name=str(props.get('archivo_origen')).split('.')
    fname = file_name[0]+'.*'
    r = glob.glob(fname) #función usada por python para buscar archivos
    for i in r:
        print('Eliminando..'+str(i))
        os.remove(i)
 

A continuación, el código de la función upload_GCS_BQ el cual realiza la ingesta del archivo al proyecto de Google Cloud definido en el archivo de configuración.

#Librerias de GCP 
from google.cloud import bigquery
from google.cloud import storage
from google.api_core.exceptions import BadRequest
from google.cloud.exceptions import NotFound
from apiclient.errors import HttpError
#Biblioteca de Python para manejo de archivos csv
import csv

def upload_GCS_BQ(creds,props,file_id):
    
    exit_codeBQ=0
    strerror=""
    registros=0
    Timestamp_Date = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S.%f %Z') # obtenemos la fecha de sistema en formato Timestamp
   
        #Se realiza la carga a Google Cloud Storage
        Current_Date = datetime.datetime.today().strftime ('%Y-%b-%d %H_%M_%S')
        #Dentro de props, vienen las propiedades del archivo
        #a cargar, dividimos el nombre del archivo para agregarle 
        #la fecha y así crear un archivo de respaldo
        if props.get('archivo_origen').find('.')!=-1:
            file_part=props.get('archivo_origen').split('.',1)
            filename_bkp=file_part[0]+' '+str(Current_Date)+'.'+file_part[1]
        else:
            filename_bkp=props.get('archivo_origen')+str(Current_Date)
        #usando funciones de las bibliotecas de google se realiza la carga del archivo a Google Cloud Storage
        try:
            bucket = creds.get('clientGS').get_bucket(props.get('Bucket_GCS'))    

            blob = bucket.blob(props.get('Path_GCS')+props.get('archivo_origen'))
            blob.upload_from_filename(props.get('archivo_origen'))

            registros=0

            dest_bucket = creds.get('clientGS').get_bucket(props.get('Bucket_GCS'))

            new_blob_name=props.get('Path_GCS_bkp')+filename_bkp
            new_blob = bucket.copy_blob(
                             blob, dest_bucket, new_blob_name)


            #Seteamos la variable exit_codeBQ en 1 para validar que la carga fue exitosa
            exit_codeBQ=1
        #si hay errores en la carga se setea la variable a 0
        except BadRequest as e:
            for err in e.errors:
                error=err
            exit_codeBQ=0
 

La segunda parte de la función realiza la carga a BigQuery, a partir del archivo que ya está en nuestro bucket de Google Cloud Storage

# Configuramos las opciones de la tabla definidas en el API de BigQuery
        dataset_ref =   creds.get('clientBQ').dataset(str(props.get('DataSet_BQ')))
        job_config = bigquery.LoadJobConfig()
        job_confighis = bigquery.LoadJobConfig()
        job_config.skip_leading_rows = 1
        job_confighis.skip_leading_rows=1
        job_config.field_delimiter = '|'
        job_confighis.field_delimiter = '|'
        job_config.write_disposition = 'WRITE_TRUNCATE'
        job_confighis.write_disposition = 'WRITE_APPEND'
        job_config.autodetect=True
        job_confighis.autodetect=True

#Establecemos el formato de origen de nuestro archivo como CSV
        job_config.source_format = bigquery.SourceFormat.CSV
        job_confighis.source_format = bigquery.SourceFormat.CSV
        uri = "gs://"+props.get('Bucket_GCS')+"/"+props.get('Path_GCS')+props.get('archivo_origen') #Este es el path de nuestro archive en Cloud Storage

        try:
            load_job = creds.get('clientBQ').load_table_from_uri(
                uri, dataset_ref.table(props.get('Tabla')), job_config=job_config)  # API request

            load_job.result()  #Espera a que termine la carga de la tabla.
            destination_table = creds.get('clientBQ').get_table(dataset_ref.table(props.get('Tabla')))
            registros=destination_table.num_rows
#Obtenemos el id de la tabla a partir de las propiedades definidas            
table_id=str(props.get('proyecto')) +'.'+str(props.get('DataSet_BQ'))+'.'+str(props.get('Tabla'))

            table = creds.get('clientBQ').get_table(table_id)  
            



#Agregamos un campo para colocar la fecha de modificación de la tabla
            original_schema = table.schema
            new_schema = original_schema[:]  # Creates a copy of the schema.
            new_schema.append(bigquery.SchemaField("FECHA_MODIFICACION", "TIMESTAMP"))

            table.schema = new_schema
            table = creds.get('clientBQ').update_table(table, ["schema"])  

#Hacemos un update para agregar la fecha de modificación
queryUpdate="UPDATE "+str(props.get('DataSet_BQ'))+"."+str(props.get('Tabla')) +" SET FECHA_MODIFICACION = TIMESTAMP('"+Timestamp_Date.strip() +"') WHERE TRUE"
            dml_statement = ("UPDATE "+str(props.get('DataSet_BQ'))+"."+str(props.get('Tabla')) +" SET FECHA_MODIFICACION = TIMESTAMP('"+Timestamp_Date.strip() +"') WHERE TRUE")
            query_job = creds.get('clientBQ').query(dml_statement)  
            query_job.result()  

                     #Seteamos la variable exit_codeBQ en 1 para validar que la carga fue exitosa
 
            exit_codeBQ=1
  #si hay errores en la carga se setea la variable a 0
except BadRequest as e:
            for err in e.errors:
                strerror=str(err)
            
            exit_codeBQ=0
     
    #Con este return devolvemos los valores de cada variable a la función principal
    return exit_codeBQ,registros,Timestamp_Date,strerror
 

Este es el Código básico para cargar nuestros archivos en Google Cloud Storage y Big Query, haciendo uso de las funciones incluidas en sus APIs.

¿Quieres saber más de lo que ofrecemos y ver otros casos de éxito?