Mi experiencia en el mundo de Big Data – Parte II

Mi experiencia en el mundo de Big Data – Parte II
Autor Bluetab
Categoría Tech

Mi experiencia en el mundo de Big Data – Parte II

Senior Data Engineer Share on twitter
Share on linkedinEn la entrega anterior (adjunto) creamos los scripts para enlistar y descargar archivos desde Google Drive hacia nuestro filesystem local.En esta entrega continuaremos con el código de la función processDriveFiles.py y crearemos los scripts para hacer la carga de archivos hacia Google CloudLa funcionalidad de este script es procesar los archivos listados en nuestro archivo parameters.csv, los cuales tengan el parámero Status con valor 1, recordemos que esto le indica a nuestro programa si el archivo se descargará y procesará o no.A continuación, el código básico de esta función. Para nuestro ejemplo solo incluiremos archivos con extensión csv y separados por pipes “|”.En pasos anteriores ya descargamos nuestro archivo al servidor local, el paso siguiente será ingestarlo en Big query y subir el archivo a nuestro proyecto de GCP.El siguiente código se encarga de validar el archivo e ingestarlo hacia nuestro destino definido.#Validamos que el tamaño del archivo sea mayor a 0 para poder cargarlo al destino definido en el archivo de configuración, en este caso nuestro destino es Google Cloud Storage y BigQuery, al cual le dimos el valor 1 en nuestro archivo. file_size=os.stat(props['archivo_origen']).st_size if (int(file_size)>0): if(int(props['Destino'])=1): #Tenemos las variables siguientes, sus valores son devueltos por la función upload_GCS_BQ: #exit_codeBQ - Bandera para indicar si la ingesta fue exitosa o no. #registros - Almacena el numero de registros del archivo. #Timestamp_date – La fecha en que se hace la ingesta. #strerror - Si hay error en la ingesta, esta variable almacena el #código del error exit_codeBQ,registros,Timestamp_Date,strerror=upload_GCS_BQ(creds,props,item['id']) else: print('archivo vacio') #Al final del proceso, eliminamos los archivos descargados a nuestro servidor, para liberar el espacio ocupado file_name=str(props.get('archivo_origen')).split('.') fname = file_name[0]+'.*' r = glob.glob(fname) #función usada por python para buscar archivos for i in r: print('Eliminando..'+str(i)) os.remove(i) A continuación, el código de la función upload_GCS_BQ el cual realiza la ingesta del archivo al proyecto de Google Cloud definido en el archivo de configuración.#Librerias de GCP from google.cloud import bigquery from google.cloud import storage from google.api_core.exceptions import BadRequest from google.cloud.exceptions import NotFound from apiclient.errors import HttpError #Biblioteca de Python para manejo de archivos csv import csv def upload_GCS_BQ(creds,props,file_id): exit_codeBQ=0 strerror="" registros=0 Timestamp_Date = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S.%f %Z') # obtenemos la fecha de sistema en formato Timestamp #Se realiza la carga a Google Cloud Storage Current_Date = datetime.datetime.today().strftime ('%Y-%b-%d %H_%M_%S') #Dentro de props, vienen las propiedades del archivo #a cargar, dividimos el nombre del archivo para agregarle #la fecha y así crear un archivo de respaldo if props.get('archivo_origen').find('.')!=-1: file_part=props.get('archivo_origen').split('.',1) filename_bkp=file_part[0]+' '+str(Current_Date)+'.'+file_part[1] else: filename_bkp=props.get('archivo_origen')+str(Current_Date) #usando funciones de las bibliotecas de google se realiza la carga del archivo a Google Cloud Storage try: bucket = creds.get('clientGS').get_bucket(props.get('Bucket_GCS')) blob = bucket.blob(props.get('Path_GCS')+props.get('archivo_origen')) blob.upload_from_filename(props.get('archivo_origen')) registros=0 dest_bucket = creds.get('clientGS').get_bucket(props.get('Bucket_GCS')) new_blob_name=props.get('Path_GCS_bkp')+filename_bkp new_blob = bucket.copy_blob( blob, dest_bucket, new_blob_name) #Seteamos la variable exit_codeBQ en 1 para validar que la carga fue exitosa exit_codeBQ=1 #si hay errores en la carga se setea la variable a 0 except BadRequest as e: for err in e.errors: error=err exit_codeBQ=0 La segunda parte de la función realiza la carga a BigQuery, a partir del archivo que ya está en nuestro bucket de Google Cloud Storage# Configuramos las opciones de la tabla definidas en el API de BigQuery dataset_ref = creds.get('clientBQ').dataset(str(props.get('DataSet_BQ'))) job_config = bigquery.LoadJobConfig() job_confighis = bigquery.LoadJobConfig() job_config.skip_leading_rows = 1 job_confighis.skip_leading_rows=1 job_config.field_delimiter = '|' job_confighis.field_delimiter = '|' job_config.write_disposition = 'WRITE_TRUNCATE' job_confighis.write_disposition = 'WRITE_APPEND' job_config.autodetect=True job_confighis.autodetect=True #Establecemos el formato de origen de nuestro archivo como CSV job_config.source_format = bigquery.SourceFormat.CSV job_confighis.source_format = bigquery.SourceFormat.CSV uri = "gs://"+props.get('Bucket_GCS')+"/"+props.get('Path_GCS')+props.get('archivo_origen') #Este es el path de nuestro archive en Cloud Storage try: load_job = creds.get('clientBQ').load_table_from_uri( uri, dataset_ref.table(props.get('Tabla')), job_config=job_config) # API request load_job.result() #Espera a que termine la carga de la tabla. destination_table = creds.get('clientBQ').get_table(dataset_ref.table(props.get('Tabla'))) registros=destination_table.num_rows #Obtenemos el id de la tabla a partir de las propiedades definidas table_id=str(props.get('proyecto')) +'.'+str(props.get('DataSet_BQ'))+'.'+str(props.get('Tabla')) table = creds.get('clientBQ').get_table(table_id) #Agregamos un campo para colocar la fecha de modificación de la tabla original_schema = table.schema new_schema = original_schema[:] # Creates a copy of the schema. new_schema.append(bigquery.SchemaField("FECHA_MODIFICACION", "TIMESTAMP")) table.schema = new_schema table = creds.get('clientBQ').update_table(table, ["schema"]) #Hacemos un update para agregar la fecha de modificación queryUpdate="UPDATE "+str(props.get('DataSet_BQ'))+"."+str(props.get('Tabla')) +" SET FECHA_MODIFICACION = TIMESTAMP('"+Timestamp_Date.strip() +"') WHERE TRUE" dml_statement = ("UPDATE "+str(props.get('DataSet_BQ'))+"."+str(props.get('Tabla')) +" SET FECHA_MODIFICACION = TIMESTAMP('"+Timestamp_Date.strip() +"') WHERE TRUE") query_job = creds.get('clientBQ').query(dml_statement) query_job.result() #Seteamos la variable exit_codeBQ en 1 para validar que la carga fue exitosa exit_codeBQ=1 #si hay errores en la carga se setea la variable a 0 except BadRequest as e: for err in e.errors: strerror=str(err) exit_codeBQ=0 #Con este return devolvemos los valores de cada variable a la función principal return exit_codeBQ,registros,Timestamp_Date,strerror Este es el Código básico para cargar nuestros archivos en Google Cloud Storage y Big Query, haciendo uso de las funciones incluidas en sus APIs.Para mayor referencia de su uso, puedes consultar los siguientes enlaces: Google Drive: https://developers.google.com/drive/api/v2/about-sdkGoogle Cloud Storage: https://cloud.google.com/storage/docs/reference/libraries#client-libraries-usage-pythonGoogle Big Query: https://cloud.google.com/bigquery/docs/reference/libraries#client-libraries-usage-pythonCargar un archivo CSV desde Cloud Storage:https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv
¿Quieres saber más de lo que ofrecemos y ver otros casos de éxito?


DESCUBRE BLUETAB

Share on twitter
Share on linkedin

SOLUCIONES, SOMOS EXPERTOS

DATA STRATEGY


DATA FABRIC


AUGMENTED ANALYTICS

Siguientes pasos con Bluetab

Habla con Bluetab

Cuéntanos qué problema necesitas resolver y definamos el siguiente paso con un especialista de Bluetab.

Hablemos

Únete a Bluetab

¿Te gustó este artículo? Se parte de Bluetab.

Únete