Un proyecto de punta a punta realizado en la materia de Data Product Architecture.
- Proyecto
- Configuración
- Estructura del proyecto
- Levantamiento del producto de datos
- Levantamiento de API
- Levantamiento de dashboard de monitoreo
- Orquestación
- Pruebas Unitarias
- Análisis de sesgo e inequidad
- Proceso de ingesta manual
La base de datos con la que trabajamos contiene información respecto a las inspecciones a restaurantes y a otros establecimientos de comida en la ciudad de Chicago del primero de enero de 2010 a la fecha actual. (Los datos se encuentran en este link)
Información general de la base de datos:
- 215,000 renglones (cada fila es una inspección realizada)
- 17 columnas
Las columnas contienen información acerca del establecimiento inspeccionado así como del resultado de la inspección. Las siguientes son algunas de las variables disponibles en el conjunto de datos:
- Nombre del establecimiento inspeccionado
- Número de licencia del establecimiento
- Tipo de establecimiento
- Riesgo de la categoría del establecimiento
- Dirección del establecimiento
- Fecha de la inspección
- Tipo de inspección
- Resultados
- Violaciones al reglamento
La pregunta a contestar es la siguiente:
Dadas las características de este establecimiento, ¿pasará o no pasará la inspección?
Es decir, este es un problema de clasificación en donde intentaremos determinar, a partir de características generales de un establecimiento, si podrá o no cumplir con los estándares de salubridad requeridos para pasar una inspección.
Los datos se actualizan diariamente. Sin embargo, los pipelines de este proyecto corren de forma semanal.
Este proyecto está programado y probado usando python 3.7.4. En el directorio raíz se encuentra
un archivo llamado requirements.txt
que contiene todas las dependencias necesarias para ejecutar
satisfactoriamente el proyecto. Para instalarlas usar el siguiente comando:
pip install -r requirements.txt
Para poder correr los Jupyter notebooks se debe adicionar el csv de datos de inspecciones de la
ciudad de Chicago con el nombre Food_Inspections.csv
dentro del directorio data
.
Para conectarse programáticamente a AWS y a la API del set de datos debe de existir un archivo de configuraciones en
la ruta conf/local/credentials.yaml
. La estructura del este archivo debe ser la siguiente:
---
s3:
aws_access_key_id: SU_AWS_ACCESS_KEY_ID
aws_secret_access_key: SU_AWS_SECRET_ACCESS_KEY
food_inspections:
api_token: SU_APP_TOKEN_DE_CHICAGO_API
data_base:
user: USUARIO
password: PASSWORD
database: NOMBRE_DE_BASE_DE_DATOS
host: IP_DE_INSTANCIA_DE_BASE_DE_DATOS_EN_AWS
port: PUERTO
string: postgresql://<usuario>:<contraseña>@<host>/<nombre-base-de-datos>
...
Para poder ejecutar el proyecto satisfactoriamente, es importante personalizar algunas de las constantes que se usan
a lo largo del pipeline y que se encuentran en src.utils.constants.py
. Las que se deben modificar se enlistan a
continuación:
bucket_name
: esta debe ser modificada por el nombre de un bucket de S3 al que usted tenga acceso desde las credenciales de AWS que configuro en el paso anterior.
Para poder ejecutar las secciones del proyecto que requieren de una base de datos postgres en AWS es importante
que exista la base de datos que añadió en conf/local/credentials.yaml
para con nombre NOMBRE_DE_BASE_DE_DATOS
.
En sql/create_metadata_tables.sql
se encuentra el código de SQL necesario para crear el esquema en la base de datos
configurada en caso de ser necesario. No obstante, se recomienda, dejar que Luigi lo realice por sí solo para evitar
pasos adicionales de configuración. Para que esto suceda automáticamente, al correr el orquestador, es necesario
que la base de datos del archivo de credenciales exista y que se hayan agregado correctamente los elementos de conexión
a este mismo archivo.
¡Importante!
Es importante, para lograr exponer el modelo en una API, generar el esquema necesario en la base de datos para que luigi pueda almacenar las observaciones de las cuales se alimenta la misma. Para crear dicho esquema es necesario correr el siguiente comando en la base de datos:
CREATE SCHEMA api;
Esta es la estructura del proyecto incluyendo notebook del EDA llamado eda.ipynb
.
├── README.md <- The top-level README for developers using this project.
├── conf
│ ├── base <- Space for shared configurations like parameters
│ └── local <- Space for local configurations, usually credentials
│ └── credentials.yaml <- Required credentials for connecting to AWS
├── data <- Space for temporary csv files required for testing and EDA
│
├── docs <- Space for Sphinx documentation
│
│
├── img <- Images used for README.md
├── notebooks <- Jupyter notebooks.
│ ├── legacy <- Jupyter notebook drafts
│ ├── shapefiles <- Shapefiles and geojson required for graphing purposes
│ ├── modeling <- Jupyer notebooks used for cleaning, feature engineering and modeling drafts
│ └── eda.ipynb <- Iniital EDA and GEDA for project
│
├── references <- Data dictionaries, manuals, and all other explanatory materials.
│
├── results <- Intermediate analysis as HTML, PDF, LaTeX, etc.
│
├── requirements.txt <- The requirements file
│
├── .gitignore <- Avoids uploading data, credentials, outputs, system files etc
│
├── infrastructure
├── sql <- SQL scripts for generating database tables required for project
├── setup.py
├── temp <- Temporal storage for general use in the project
└── src <- Source code for use in this project.
├── __init__.py <- Makes src a Python module
│
├── utils <- Functions used across the project
│ ├── general.py <- Obtención de credenciales de AWS
│ └── constants.py <- Definición de constantes del proyecto
│
├── etl <- Scripts to transform data from raw to intermediate states
│
├── pipeline
│ ├── ingesta_almacenamiento.py <- ingesta datos desde API y almacenamiento en S3
│ ├── limpieza_feature_engineering.py <- limpieza y generación de features de modelo
│ ├── deprecated_funs.py <- funciones que probablemente no se necesiten
│ ├── bias_fairness.py <- funcionalidad de limpieza e inequidad
│ ├── modelling.py <- entrenamiento y selección de modelo
│ └── prediction.py <- predicción de modelo
│
├── tests
│ ├── ingestion_tests.py <- pruebas unitarias para ingestión
│ ├── upload_tests.py <- pruebas unitarias para almacenamiento
│ ├── clean_data_tests.py <- pruebas unitarias para limpieza y preprocesamiento
│ ├── feature_eng_tests.py <- pruebas unitarias para feature engineering
│ ├── training_tests.py <- pruebas unitarias para entrenamiento
│ ├── selection_tests.py <- pruebas unitarias para selección de modelo
│ ├── aequitas_tests.py <- pruebas unitarias para sesgo e inequidad
│ └── prediction_tests.py <- pruebas unitarias para predicción
│
├── api
│ └── api.py <- código de Flask Swagger para levantamiento de API
│
├── monitoring
│ └── dahsboard.py <- código de dash para dashboard de monitoreo de modelo
│
└── orchestration <- Luigi task definitions used across the project
├── data_ingestion_task.py <- Luigi task for data ingestion
├── ingestion_testing_task.py <- Luigi task for data ingestion unit tests
├── ingestion_metadata_task.py <- Luigi task for data ingestion metadata
├── data_s3_upload_task.py <- Luigi task for data upload to S3
├── data_s3_upload_testing_task.py <- Luigi task for data upload to S3 unit tests
├── data_s3_upload_metadata_task.py <- Luigi task for data upload to S3 metadata
├── clean_data_task.py <- Luigi task for data cleaning
├── clean_data_test_task.py <- Luigi task for data cleaning unit tests
├── clean_data_metadata_task.py <- Luigi task for data cleaning metadata
├── feature_engineering_task.py <- Luigi task for feature engineering
├── feature_eng_test_task.py <- Luigi task for feature engineering unit tests
├── feature_engineering_metadata_task.py <- Luigi task for feature engineering metadata
├── training_task.py <- Luigi task for training
├── training_test_task.py <- Luigi task for training unit tests
├── training_metadata_task.py <- Luigi task for training metadata
├── selection_task.py <- Luigi task for model selection
├── selection_test_task.py <- Luigi task for model selection unit tests
├── selection_metadata_task.py <- Luigi task for model selection metadata
├── aequitas_task.py <- Luigi task for bias and fairness
├── aequitas_test_task.py <- Luigi task for bias and fairness unit tests
├── aequitas_metadata_task.py <- Luigi task for bias and fairness metadata
├── prediction_task.py <- Luigi task for generating predictions
├── prediction_test_task.py <- Luigi task for prediction unit tests
├── prediction_metadata_task.py <- Luigi task for prediction metadata
├── api_storage_task.py <- Luigi task to store predictions on api db
└── monitoring_task.py <- Luigi task to store predictions on monitoring db
Una vez que se siguió adecuadamente las instrucciones de configuración, la primera etapa para levantar el producto de datos en realizar la rama de generación del modelo predictivo. Para ello basta con llamar la task de Luigi encargada de generar los metadatos de sesgo e inequidad (ver sección de orquestación para más información). El comando en específico a utilizar es el siguiente:
PYTHONPATH='.' luigi --module src.orchestration.aequitas_metadata_task AequitasMetaTask --historic [--local-scheduler] [--query-date <YYYY-MM-DD>]
En general se espera no tener que utilizar el parámetro query-date
, pero de ser necesario este servirá para ejecutar
las tareas de generación del modelo con datos históricos hasta la fecha especificada. Por ejemplo si se quisiera entrenar
el modelo con los datos históricos hasta el 4 de enero de 2021 se utilizaría la siguiente línea:
PYTHONPATH='.' luigi --module src.orchestration.aequitas_metadata_task AequitasMetaTask --historic [--local-scheduler] --query-date 2021-01-04
Con estos comandos se generará un modelo a partir del cual se podrán hacer predicciones semana con semana.
Posterior a contar con un modelo seleccionado siguiendo la sección anterior, es posible realizar toda la rama de predicciones semanales con el siguiente comando:
PYTHONPATH='.' luigi --module src.orchestration.monitoring_task MonitoringTask [--local-scheduler] [--query-date <YYYY-MM-DD>]
Como en el caso anterior el uso de query-date
permitirá hacer las predicciones de fechas en el pasado. En caso de
omitirse, que es el uso esperado para el proyecto, se harán las predicciones con los 7 días anteriores al día de
ejecución. La intención de este proyecto es que esta rama se ejecute todos los lunes a las 4 am por lo que siempre
se estarán generando las predicciones de la semana pasada.
Si ya se cuenta con al menos una semana de predicciones, se puede levantar la API del producto de datos usando el siguiente comando:
python3 -m src.api.api
La API quedará disponible con este comando en: http://0.0.0.0:5000/
donde se encuentra la documentación para la misma.
Si ya se cuenta con al menos una semana de predicciones, se puede levantar el dashboard de monitoreo del producto de datos usando el siguiente comando:
python3 -m src.monitoring.dashboard
El dashboard quedará disponible con este comando en: http://0.0.0.0:8050/
.
Esta rama cuenta con 21 tasks de orquestación. A continuación se muestra el DAG de Luigi:
Para ejecutar cualquiera de los siguientes tasks, una vez que se siguieron las instrucciones de configuración, se requiere introducir la siguiente línea de comandos en el directorio raíz del proyecto:
PYTHONPATH='.' luigi --module src.orchestration.<NOMBRE_DE_SCRIPT> <NOMBRE_DE_CLASE> --historic [--local-scheduler] [--query-date <YYYY-MM-DD>]
--historic
: este argumento debe incluirse SIEMPRE en etapas productivas de esta rama de orquestación. Este permite que todos las tareas de entrenamiento, selección de modelo y sesgo e inequidad se realicen con todas las observaciones históricas a la fecha.--query-date
: se debe agregar la fecha de ingesta deseada en formato YYYY-MM-DD. Si se omite este argumento, se utilizará la fecha del día de ejecución en el código.--local-scheduler
: ejecuta las tareas de Luigi sin necesidad de iniciar un scheduler. Para omitirse es necesario estar corriendo un scheduler a través de comandoluigid
en otra consola.
La siguiente tabla se detallan todos los tasks del proyecto así como el NOMBRE_DE_SCRIPT
y NOMBRE_DE_CLASE
que
se requieren para ejecutarlo.
Etapa | Tarea | Script | Clase | Descripción |
---|---|---|---|---|
Ingesta | Ingesta | data_ingestion_task |
DataIngestionTask |
Guardar localmente en formato pickle los datos solicitados de la API de Chicago. |
Ingesta | Pruebas unitarias de ingesta | ingestion_testing_task |
IngestionTesting |
Realiza las pruebas unitarias de ingesta. |
Ingesta | Metadatos de ingesta | ingestion_metadata_task |
IngestionMetadataTask |
Genera los metadatos de DataIngestionTask . |
Almacenamiento | Almacenamiento | data_s3_upload_task |
DataS3UploadTask |
Sube a S3 la ingesta de datos obtenida en la DataIngestionTask . |
Almacenamiento | Pruebas unitarias de almacenamiento | data_s3_upload_testing_task |
DataS3UploadTestingTask |
Realiza las pruebas unitarias de almacenamiento. |
Almacenamiento | Metadatos de almacenamiento | data_s3_upload_metadata_task |
UploadMetadataTask |
Genera los metadatos de DataS3UploadTask . |
Limpieza | Limpieza | clean_data_task |
CleanDataTask |
Realiza la limpieza de los datos guardados en DataS3UploadTask . |
Limpieza | Pruebas unitarias de limpieza | clean_data_test_task |
CleanDataTestTask |
Realiza las pruebas unitarias de limpieza. |
Limpieza | Metadatos limpieza | clean_data_metadata_task |
CleanDataMetaTask |
Genera los metadatos de CleanDataTask . |
Feature engineering | Feature engineering | feature_engineering_task |
FeatureEngineeringTask |
Genera los features requeridos empleando los datos generados en CleanDataTask . |
Feature engineering | Pruebas unitarias de feature engineering | feature_eng_test_task |
FeatureEngTestTask |
Realiza las pruebas unitarias de feature engineering. |
Feature engineering | Metadatos de feature engineering | feature_engineering_metadata_task |
FeatureEngineeringMetaTask |
Genera los metadatos de FeatureEngineeringTask . |
Entrenamiento | Entrenamiento | training_task |
TrainingTask |
Realiza el entrenamiento de los modelos y los sube a S3. |
Entrenamiento | Pruebas unitarias de entrenamiento | training_test_task |
TrainingTestTask |
Realiza las pruebas unitarias de entrenamiento. |
Entrenamiento | Metadatos entrenamiento | training_metadata_task |
TrainingMetaTask |
Genera los metadatos de TrainingTestTask . |
Selección | Selección | selection_task |
SelectionTask |
A partir de los modelos entrenados selecciona el mejor basado en AUC. |
Selección | Pruebas unitarias de selección | selection_test_task |
SelectionTestTask |
Realiza las pruebas unitarias de selección |
Selección | Metadatos de selección | selection_metadata_task |
SelectionMetaTask |
Genera los metadatos de SelectionTask . |
Sesgo e inequidad | Sesgo e inequidad | aequitas_task |
AequitasTask |
A partir del modelo seleccionado, genera las métricas de bias y fairness. |
Sesgo e inequidad | Pruebas unitarias de Sesgo e inequidad | aequitas_test_task |
AequitasTestTask |
Realiza las pruebas unitarias de bias y fairness. |
Sesgo e inequidad | Metadatos de Sesgo e inequidad | aequitas_metadata_task |
AequitasMetaTask |
Genera los metadatos de bias y fairness. |
Esta rama cuenta con 17 tasks de orquestación. A continuación se muestra el DAG de Luigi:
Para ejecutar cualquiera de los siguientes tasks, una vez que se siguieron las instrucciones de configuración, se requiere introducir la siguiente línea de comandos en el directorio raíz del proyecto::
PYTHONPATH='.' luigi --module src.orchestration.<NOMBRE_DE_SCRIPT> <NOMBRE_DE_CLASE> [--local-scheduler] [--query-date <YYYY-MM-DD>]
--query-date
: se debe agregar la fecha de ingesta deseada en formato YYYY-MM-DD. Si se omite este argumento, se utilizará la fecha del día de ejecución en el código.--local-scheduler
: ejecuta las tareas de Luigi sin necesidad de iniciar un scheduler. Para omitirse es necesario estar corriendo un scheduler a través de comandoluigid
en otra consola.
La siguiente tabla se detallan todos los tasks del proyecto así como el NOMBRE_DE_SCRIPT
y NOMBRE_DE_CLASE
que
se requieren para ejecutarlo.
Etapa | Tarea | Script | Clase | Descripción |
---|---|---|---|---|
Ingesta | Ingesta | data_ingestion_task |
DataIngestionTask |
Guardar localmente en formato pickle los datos solicitados de la API de Chicago. |
Ingesta | Pruebas unitarias de ingesta | ingestion_testing_task |
IngestionTesting |
Realiza las pruebas unitarias de ingesta. |
Ingesta | Metadatos de ingesta | ingestion_metadata_task |
IngestionMetadataTask |
Genera los metadatos de DataIngestionTask . |
Almacenamiento | Almacenamiento | data_s3_upload_task |
DataS3UploadTask |
Sube a S3 la ingesta de datos obtenida en la DataIngestionTask . |
Almacenamiento | Pruebas unitarias de almacenamiento | data_s3_upload_testing_task |
DataS3UploadTestingTask |
Realiza las pruebas unitarias de almacenamiento. |
Almacenamiento | Metadatos de almacenamiento | data_s3_upload_metadata_task |
UploadMetadataTask |
Genera los metadatos de DataS3UploadTask . |
Limpieza | Limpieza | clean_data_task |
CleanDataTask |
Realiza la limpieza de los datos guardados en DataS3UploadTask . |
Limpieza | Pruebas unitarias de limpieza | clean_data_test_task |
CleanDataTestTask |
Realiza las pruebas unitarias de limpieza. |
Limpieza | Metadatos limpieza | clean_data_metadata_task |
CleanDataMetaTask |
Genera los metadatos de CleanDataTask . |
Feature engineering | Feature engineering | feature_engineering_task |
FeatureEngineeringTask |
Genera los features requeridos empleando los datos generados en CleanDataTask . |
Feature engineering | Pruebas unitarias de feature engineering | feature_eng_test_task |
FeatureEngTestTask |
Realiza las pruebas unitarias de feature engineering. |
Feature engineering | Metadatos de feature engineering | feature_engineering_metadata_task |
FeatureEngineeringMetaTask |
Genera los metadatos de FeatureEngineeringTask . |
Predicción | Predicción de ingesta continua | prediction_task |
PredictionTask |
Realiza las predicciones de la ingesta continua y la almacena en S3 |
Predicción | Pruebas unitarias de predicción | prediction_test_task |
PredictionTestTask |
Realiz pruebas unitarias de PredictionTask . |
Predicción | Metadatos de predicción | prediction_metadata_task |
PredictionMetaTask |
Genera los metadatos de PredictionTask . |
API | Almacenamiento en DB de API | api_storage_task |
ApiStorageTask |
Almacena en la base de datos de la API las predicciones. |
Monitoreo | Almacenamiento en DB de monitorei | monitoring_task |
MonitoringTask |
Almacena en la base de datos del monitoreo las predicciones. |
Algunos ejemplos (adecuados para el checkpoint 7) son:
PYTHONPATH='.' luigi --module src.orchestration.prediction_metadata_task PredictionMetaTask --local-scheduler
PYTHONPATH='.' luigi --module src.orchestration.api_storage_task ApiStorageTask --local-scheduler
Ejemplo: Almacenamiento de predicciones en base de datos de monitoreo con ingesta continua del 20 de abril de 2021
PYTHONPATH='.' luigi --module src.orchestration.monitoring_task MonitoringTask --local-scheduler --query-date 2021-04-20
Las pruebas unitarias de ingesta son las siguientes:
test_number_of_columns
: verifica el número de columnas en la ingestatest_df_not_empty
: verifica que la ingesta no venga sin observacionestest_df_columns
: verifica por nombre que se cuente con todas las variables esperadas
Las pruebas unitarias de almacenamiento son las siguientes:
test_df_are_equal
: verifica que el contenido de la ingesta y del df almacenado en S3 sea idéntico renglón por renglón y tipos de datos
Las pruebas unitarias de limpieza son las siguientes:
test_df_smaller_equal
: verifica que el dataframe post limpieza no tenga más variables que el que proviene de almacenamientotest_df_not_empty
: verifica que el dataframe no esté vacíotest_df_smaller_rows
: verifica que el dataframe post limpieza no tenga más observaciones que el que proviene de almacenamientotest_df_columns
: verifica que las variables del df sean un subconjunto de las originales ingestadas
Las pruebas unitarias de feature engineering son las siguientes:
test_df_not_empty
: verifica que los siguientes dataframes existan en S3:- x_train (en caso de activar el flag de
--training
) - y_train (en caso de activar el flag de
--training
) - x_test (en caso de activar el flag de
--training
) - y_test (en caso de activar el flag de
--training
) - self.X (en caso de no activar el flag de
--training
) lo que implica que se utilizará para predicción - self.y (en caso de no activar el flag de
--training
) lo que implica que se utilizará para predicción
- x_train (en caso de activar el flag de
Las pruebas unitarias de entrenamiento son las siguientes:
test_enough_models
: verifica que después del proceso de entrenamiento existan al menos dos modelos.
Nota: si se solicita a la task de Luigi más de 2 modelos (por el momento) esta prueba unitaria falla, ya que nuestro proyecto aún no entrena más de dos. Un ejemplo de este caso sería el siguiente comando:
PYTHONPATH='.' luigi --module src.orchestration.training_metadata_task TrainingMetaTask --local-scheduler --desired-models 10
Para pruebas rápidas se recomienda no incluir el flag de --historic
para que el entrenamiento sea más rápido.
Para producción es obligatorio incluirlo.
Las pruebas unitarias de selección son las siguientes:
test_classes
: verifica que el modelo seleccionado tenga como output las clases 0 y 1, aunque se puede especificar otras en caso de que esto cambie en el futuro a través del parámetro--desired-classes
.
Nota: si se solicita cualquier combinación de clases que no sean 0 y 1, esta prueba unitaria fallará, ya que nuestros scripts de modelado están diseñados para solo generar esas dos etiquetas de predicción. Un ejemplo de este caso sería el siguiente comando:
PYTHONPATH='.' luigi --module src.orchestration.selection_metadata_task SelectionMetaTask --local-scheduler --desired-classes '[0, 1, 2]'
Para pruebas rápidas se recomienda no incluir el flag de --historic
para que el entrenamiento sea más rápido.
Para producción es obligatorio incluirlo.
Las pruebas unitarias de sesgo e inequidad son las siguientes:
-
test_df_not_empty
: verifica que en verdad se estén generando las métricas de sesgo e inequidad al checar que no estén vacíos los df. -
test_number_of_categories
: verifica que existan al menos dos categorías de las variables de atributos protegidos. Este comportamiento se puede modificar al usar el parámetro--min-categories
Nota: si se solicita un número de categorías mayor a las que existen naturalmente en los datos, esta prueba unitaria fallará. Un ejemplo de este caso sería el siguiente comando:
PYTHONPATH='.' luigi --module src.orchestration.aequitas_metadata_task AequitasMetaTask --min-categories 100
Para pruebas rápidas se recomienda no incluir el flag de --historic
para que el entrenamiento sea más rápido.
Para producción es obligatorio incluirlo.
Las pruebas unitarias de predicción son las siguientes:
test_not_empty
: verifica que se haya generado por lo menos una predicción.test_labels
: verifica que las etiquetas predichas sean 0 o 1.test_min_score
: verifica que el score mínimo predicho sea 0.0.test_max_score
: verifica que el score máximo predicho sea 1.0.
Nota: si se solicita un score máximo o mínimo que no se encuentren en el intervalo [0, 1] las pruebas unitarias de
test_min_score
y/o test_max_score
fallarán. Un ejemplo de este caso sería el siguiente comando:
PYTHONPATH='.' luigi --module src.orchestration.prediction_metadata_task PredictionMetaTask --max-desired-score -1.0 --local-scheduler
-
¿Cuáles son los atributos protegidos? Variable
facility_type
-
¿Qué grupos de referencia tiene cada atributo protegido?, explica el por qué. Restaurante. Debido a que en mayor proporción son los que obtienen inspecciones favorables (pasan la visita) y buscamos identificar si existe algún tipo de ventaja relativa.
-
¿Tu modelo es punitivo o asistivo? explica por qué. Asistivo. La decisión se está tomando desde el punto de vista del establecimiento, dado ese contexto, el modelo tiene la función de identificar si este pasará o no la inspección y tomar acciones pertinentes para enfrentar la visita.
-
¿Qué métricas cuantificarás ocuparás en sesgo e inequidad? explica por qué. FOR. Nos interesa identificar y cuantificar la presencia de sesgo hacia algún tipo de establecimiento respecto a no ser clasificado como etiqueta positiva (pasar la inspección). En ese sentido, nuestro objetivo sería lograr paridad en las tasas de falsos negativos en todos los grupos.
Para poder llevar a cabo este procedimiento, asegúrese de tener el documento de credentials.yaml
tal cual se indica
en la sección de Credenciales.
Para el bucket de S3 se utilizará una constante definida en
src/utils/constants.py
sin embargo, para uso externo no es posible utilizar esta constante sin modificarla,
ya que, el bucket empleado en el proyecto no tiene acceso público. Por ello la variable bucket_name
, debe
ser modificada por el nombre de un bucket que usted pueda acceder a partir de sus credenciales de AWS que se
encuentran en el archivo de credenciales mencionado.
Al encontrarse en el directorio raíz del proyecto, ejecutar las siguientes instrucciones:
-
Importar las funciones y constantes necesarias para la ingesta histórica
from src.pipeline.ingesta_almacenamiento import get_client, ingesta_inicial, get_s3_resource, guardar_ingesta from src.utils.constants import bucket_name from src.utils.general import get_upload_path
-
Obtener el cliente que permite conectarse a la API de datos de la Ciudad de Chicago
client = get_client()
-
Obtener las observaciones históricas hasta la fecha de búsqueda de la API de inspecciones de comida en la Ciudad de Chicago
results_ingesta_inicial = ingesta_inicial(client=client)
Nota: esta función usa por default el parámetro
limit
con valor de300,000
, pero es posible llamarla modificando dicho parámetro en caso de la ingesta histórica implique más observaciones en un futuro. Adicionalmente, se puede proveer un parámetroquery_date
con un dato de tipodatetime
para especificar la fecha a partir de la cual se quiere recolectar datos hacia atrás. En caso de no suministrarla, como en el ejemplo, la función obtendrá todas las observaciones de la base de datos hasta el día de ejecución. Este es el comportamiento buscado para el proyecto por lo que en general se llamará sin este parámetro y solamente una ocasión. -
Obtener los recursos de S3 de las credenciales de AWS
s3_resource = get_s3_resource()
-
Obtener la ruta donde se deben guardar los datos en el bucket
bucket_path = get_upload_path(historic=True)
Nota: esta función le devuelve la ruta correcta de acuerdo al día que se llamó. Ejemplo:
ingestion/initial/historic-inspections-2021-02-22.pkl
-
Guardado de la ingesta en el bucket de S3
guardar_ingesta(bucket_name=bucket_name, bucket_path=bucket_path, data=results_ingesta_inicial, s3_resource=s3_resource)
Nota: recuerde que la variable
bucket_name
debe ser sustituida por el nombre de un bucket al que usted tenga acceso.
Al encontrarse en el directorio raíz del proyecto, ejecutar las siguientes instrucciones:
-
Importar las funciones y constantes necesarias para la ingesta consecutiva
from src.pipeline.ingesta_almacenamiento import get_client, ingesta_consecutiva, get_s3_resource, guardar_ingesta from src.utils.constants import bucket_name from src.utils.general import get_upload_path
-
Obtener el cliente que permite conectarse a la API de datos de la Ciudad de Chicago
client = get_client()
-
Obtener las observaciones históricas hasta la fecha de búsqueda de la API de inspecciones de comida en la Ciudad de Chicago
results_ingesta_consecutiva = ingesta_consecutiva(client=client)
Nota: esta función usa por default el parámetro
limit
con valor de1,000
, pero es posible llamarla modificando dicho parámetro en caso de la ingesta consecutiva implique más observaciones en un futuro. Adicionalmente, se puede proveer un parámetroquery_date
con un dato de tipodatetime
para especificar la fecha a partir de la cual se quiere recolectar datos. En caso de no suministrarla, como en el ejemplo, la función calculará la fecha de 7 días atrás con respecto al día que se ejecuta. Este es el comportamiento buscado para el proyecto por lo que en general se llamará sin este parámetro. -
Obtener los recursos de S3 de las credenciales de AWS
s3_resource = get_s3_resource()
-
Obtener la ruta donde se deben guardar los datos en el bucket
bucket_path = get_upload_path(historic=False)
Nota: esta función le devuelve la ruta correcta de acuerdo al día que se llamó. Ejemplo:
ingestion/consecutive/consecutive-inspections-2021-02-22.pkl
-
Guardado de la ingesta en el bucket de S3
guardar_ingesta(bucket_name=bucket_name, bucket_path=bucket_path, data=results_ingesta_consecutiva, s3_resource=s3_resource)
Nota: recuerde que la variable
bucket_name
debe ser sustituida por el nombre de un bucket al que usted tenga acceso.
Los datos se almacenan en formato pickle en S3, correspondiendo a un JSON. Para poder usarlos como un dataframe de pandas se recomienda usar el siguiente bloque de código para evitar conflictos una vez que ya se descargó de S3:
import pandas as pd
ingestion_df = pd.DataFrame.from_dict(ingestions_pkl)