Senior Data Engineer
En la entrega anterior (adjunto) creamos los scripts para enlistar y descargar archivos desde Google Drive hacia nuestro filesystem local.
En esta entrega continuaremos con el código de la función processDriveFiles.py y crearemos los scripts para hacer la carga de archivos hacia Google Cloud
La funcionalidad de este script es procesar los archivos listados en nuestro archivo parameters.csv, los cuales tengan el parámero Status con valor 1, recordemos que esto le indica a nuestro programa si el archivo se descargará y procesará o no.
A continuación, el código básico de esta función. Para nuestro ejemplo solo incluiremos archivos con extensión csv y separados por pipes “|”.
En pasos anteriores ya descargamos nuestro archivo al servidor local, el paso siguiente será ingestarlo en Big query y subir el archivo a nuestro proyecto de GCP.
El siguiente código se encarga de validar el archivo e ingestarlo hacia nuestro destino definido.
#Validamos que el tamaño del archivo sea mayor a 0 para poder cargarlo al destino definido en el archivo de configuración, en este caso nuestro destino es Google Cloud Storage y BigQuery, al cual le dimos el valor 1 en nuestro archivo.
file_size=os.stat(props['archivo_origen']).st_size
if (int(file_size)>0):
if(int(props['Destino'])=1):
#Tenemos las variables siguientes, sus valores son devueltos por la función upload_GCS_BQ:
#exit_codeBQ - Bandera para indicar si la ingesta fue exitosa o no.
#registros - Almacena el numero de registros del archivo.
#Timestamp_date – La fecha en que se hace la ingesta.
#strerror - Si hay error en la ingesta, esta variable almacena el #código del error
exit_codeBQ,registros,Timestamp_Date,strerror=upload_GCS_BQ(creds,props,item['id'])
else:
print('archivo vacio')
#Al final del proceso, eliminamos los archivos descargados a nuestro servidor, para liberar el espacio ocupado
file_name=str(props.get('archivo_origen')).split('.')
fname = file_name[0]+'.*'
r = glob.glob(fname) #función usada por python para buscar archivos
for i in r:
print('Eliminando..'+str(i))
os.remove(i)
A continuación, el código de la función upload_GCS_BQ el cual realiza la ingesta del archivo al proyecto de Google Cloud definido en el archivo de configuración.
#Librerias de GCP
from google.cloud import bigquery
from google.cloud import storage
from google.api_core.exceptions import BadRequest
from google.cloud.exceptions import NotFound
from apiclient.errors import HttpError
#Biblioteca de Python para manejo de archivos csv
import csv
def upload_GCS_BQ(creds,props,file_id):
exit_codeBQ=0
strerror=""
registros=0
Timestamp_Date = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S.%f %Z') # obtenemos la fecha de sistema en formato Timestamp
#Se realiza la carga a Google Cloud Storage
Current_Date = datetime.datetime.today().strftime ('%Y-%b-%d %H_%M_%S')
#Dentro de props, vienen las propiedades del archivo
#a cargar, dividimos el nombre del archivo para agregarle
#la fecha y así crear un archivo de respaldo
if props.get('archivo_origen').find('.')!=-1:
file_part=props.get('archivo_origen').split('.',1)
filename_bkp=file_part[0]+' '+str(Current_Date)+'.'+file_part[1]
else:
filename_bkp=props.get('archivo_origen')+str(Current_Date)
#usando funciones de las bibliotecas de google se realiza la carga del archivo a Google Cloud Storage
try:
bucket = creds.get('clientGS').get_bucket(props.get('Bucket_GCS'))
blob = bucket.blob(props.get('Path_GCS')+props.get('archivo_origen'))
blob.upload_from_filename(props.get('archivo_origen'))
registros=0
dest_bucket = creds.get('clientGS').get_bucket(props.get('Bucket_GCS'))
new_blob_name=props.get('Path_GCS_bkp')+filename_bkp
new_blob = bucket.copy_blob(
blob, dest_bucket, new_blob_name)
#Seteamos la variable exit_codeBQ en 1 para validar que la carga fue exitosa
exit_codeBQ=1
#si hay errores en la carga se setea la variable a 0
except BadRequest as e:
for err in e.errors:
error=err
exit_codeBQ=0
La segunda parte de la función realiza la carga a BigQuery, a partir del archivo que ya está en nuestro bucket de Google Cloud Storage
# Configuramos las opciones de la tabla definidas en el API de BigQuery
dataset_ref = creds.get('clientBQ').dataset(str(props.get('DataSet_BQ')))
job_config = bigquery.LoadJobConfig()
job_confighis = bigquery.LoadJobConfig()
job_config.skip_leading_rows = 1
job_confighis.skip_leading_rows=1
job_config.field_delimiter = '|'
job_confighis.field_delimiter = '|'
job_config.write_disposition = 'WRITE_TRUNCATE'
job_confighis.write_disposition = 'WRITE_APPEND'
job_config.autodetect=True
job_confighis.autodetect=True
#Establecemos el formato de origen de nuestro archivo como CSV
job_config.source_format = bigquery.SourceFormat.CSV
job_confighis.source_format = bigquery.SourceFormat.CSV
uri = "gs://"+props.get('Bucket_GCS')+"/"+props.get('Path_GCS')+props.get('archivo_origen') #Este es el path de nuestro archive en Cloud Storage
try:
load_job = creds.get('clientBQ').load_table_from_uri(
uri, dataset_ref.table(props.get('Tabla')), job_config=job_config) # API request
load_job.result() #Espera a que termine la carga de la tabla.
destination_table = creds.get('clientBQ').get_table(dataset_ref.table(props.get('Tabla')))
registros=destination_table.num_rows
#Obtenemos el id de la tabla a partir de las propiedades definidas
table_id=str(props.get('proyecto')) +'.'+str(props.get('DataSet_BQ'))+'.'+str(props.get('Tabla'))
table = creds.get('clientBQ').get_table(table_id)
#Agregamos un campo para colocar la fecha de modificación de la tabla
original_schema = table.schema
new_schema = original_schema[:] # Creates a copy of the schema.
new_schema.append(bigquery.SchemaField("FECHA_MODIFICACION", "TIMESTAMP"))
table.schema = new_schema
table = creds.get('clientBQ').update_table(table, ["schema"])
#Hacemos un update para agregar la fecha de modificación
queryUpdate="UPDATE "+str(props.get('DataSet_BQ'))+"."+str(props.get('Tabla')) +" SET FECHA_MODIFICACION = TIMESTAMP('"+Timestamp_Date.strip() +"') WHERE TRUE"
dml_statement = ("UPDATE "+str(props.get('DataSet_BQ'))+"."+str(props.get('Tabla')) +" SET FECHA_MODIFICACION = TIMESTAMP('"+Timestamp_Date.strip() +"') WHERE TRUE")
query_job = creds.get('clientBQ').query(dml_statement)
query_job.result()
#Seteamos la variable exit_codeBQ en 1 para validar que la carga fue exitosa
exit_codeBQ=1
#si hay errores en la carga se setea la variable a 0
except BadRequest as e:
for err in e.errors:
strerror=str(err)
exit_codeBQ=0
#Con este return devolvemos los valores de cada variable a la función principal
return exit_codeBQ,registros,Timestamp_Date,strerror
Este es el Código básico para cargar nuestros archivos en Google Cloud Storage y Big Query, haciendo uso de las funciones incluidas en sus APIs.
Para mayor referencia de su uso, puedes consultar los siguientes enlaces:
Google Drive: https://developers.google.com/drive/api/v2/about-sdkGoogle Cloud Storage: https://cloud.google.com/storage/docs/reference/libraries#client-libraries-usage-pythonGoogle Big Query: https://cloud.google.com/bigquery/docs/reference/libraries#client-libraries-usage-pythonCargar un archivo CSV desde Cloud Storage:https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv
Patron
Sponsor
© 2024 Bluetab Solutions Group, SL. All rights reserved.