Skip to content

Commit

Permalink
Analytics in prediction mode (#365)
Browse files Browse the repository at this point in the history
* Also possible to run with single number additional flownodes but multple layers

* Apply suggestions from code review

Co-authored-by: Wouter J. de Bruin <9119793+wouterjdb@users.noreply.github.com>

* Added changelog

* add analysis to pred config parser

* Config pars pred reading in ref_sim and analysis metrics

* Runs again  without ref_sim and analysis keywords in config

* Analytics enabled for prediction mode

* Pass keyword mode to analytics workflow to determine if running in ahm or prediction mode

* update abstract method of get_well_connections to fix pylint bug

* Remove perforation argument from FlowData from test

* Fix type error

* Fix type error

* Move all but abs_path transforamtion function to seperate script

* pylint fix

* update docstring get_well_connections

* black reformat

* Added Changelog

* Added Changelog

* Update CHANGELOG.md

* Suggested changes from review

* fix linting error

* update description of analysis in config parser

* Clean up of analytics code

* Restructure of ahm analytics tests

* Sorting of iteration fixed

* Restructure of code. Created forward_models in ert folder.

* pyling bufix

* Bugfix error in ERT run

* Add render realization to forward ert models

* pylint bugfix

* Update setup.py

* Update changelog

* Review fixes

Co-authored-by: LonnekevB <lonneke.vanbijsterveldt@tno.nl>
Co-authored-by: Lonneke van Bijsterveldt <74047453+LonnekevB@users.noreply.github.com>
  • Loading branch information
3 people authored Mar 19, 2021
1 parent 8510c85 commit d410aa2
Show file tree
Hide file tree
Showing 27 changed files with 429 additions and 297 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ This project adheres to [Semantic Versioning](https://semver.org/).
## Unreleased

### Added
- [#349](https://github.com/equinor/flownet/pull/349) Analytics workflow now also available for prediction mode.
- [#351](https://github.com/equinor/flownet/pull/351) Added simple plotting tool that allows for plotting of FlowNet ensembles and observations.

### Fixes
- [#349](https://github.com/equinor/flownet/pull/349) Fixes bug where the parquet parameters file would not be saved for iterations larger than 9.

### Changes
- [#349](https://github.com/equinor/flownet/pull/349) Structure change of the code. Moved all forward models called from ERT to a seperate folder ert/forward_models. Scripts moved: delete_simulation_output, save_iteration_parameters, iteration_analytics, render_realization and flow_job.
- [#347](https://github.com/equinor/flownet/pull/347) Additional flow nodes is now allowed to be either a list (equal length of number of layers) or a single integer (which will be split over the layers according to volume of concave hull).

## [0.5.1] - 2021-03-03
Expand Down
16 changes: 9 additions & 7 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,18 @@
use_scm_version=True,
package_dir={"": "src"},
packages=find_packages("src"),
package_data={"flownet": ["templates/*", "static/*", "ert/FLOW_SIMULATION"]},
package_data={
"flownet": ["templates/*", "static/*", "ert/forward_models/FLOW_SIMULATION"]
},
entry_points={
"ert": ["flow = flownet.ert._flow_job"],
"ert": ["flow = flownet.ert.forward_models._flow_job"],
"console_scripts": [
"flownet=flownet._command_line:main",
"flownet_render_realization=flownet.realization:render_realization",
"flownet_delete_simulation_output=flownet.ahm:delete_simulation_output",
"flownet_run_flow=flownet.ert._flow_job:run_flow",
"flownet_save_iteration_parameters=flownet.ahm:save_iteration_parameters",
"flownet_save_iteration_analytics=flownet.ahm:save_iteration_analytics",
"flownet_render_realization=flownet.ert.forward_models:render_realization",
"flownet_delete_simulation_output=flownet.ert.forward_models:delete_simulation_output",
"flownet_run_flow=flownet.ert.forward_models:run_flow",
"flownet_save_iteration_parameters=flownet.ert.forward_models:save_iteration_parameters",
"flownet_save_iteration_analytics=flownet.ert.forward_models:save_iteration_analytics",
"flownet_plot_results=flownet.utils.plot_results:main",
],
},
Expand Down
7 changes: 1 addition & 6 deletions src/flownet/ahm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,2 @@
from ._assisted_history_matching import (
AssistedHistoryMatching,
delete_simulation_output,
save_iteration_parameters,
)
from ._ahm_iteration_analytics import save_iteration_analytics
from ._assisted_history_matching import AssistedHistoryMatching
from ._run_ahm import run_flownet_history_matching
105 changes: 1 addition & 104 deletions src/flownet/ahm/_assisted_history_matching.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,8 @@
import argparse
import concurrent.futures
import glob
import json
import os
import pathlib
import re
import shutil
from typing import List, Dict, Tuple

from typing import List
from configsuite import ConfigSuite
import jinja2
import numpy as np
import pandas as pd

from ..ert import create_ert_setup, run_ert_subprocess
from ..realization import Schedule
Expand Down Expand Up @@ -160,97 +151,3 @@ def report(self):
else " None ",
)
print("")


def delete_simulation_output():
"""
This function is called by a forward model in ERT, deleting unnecessary
simulation output files.
Returns:
Nothing
"""
parser = argparse.ArgumentParser(prog="Delete simulation output.")

parser.add_argument(
"ecl_base", type=str, help="Base name of the simulation DATA file"
)

args = parser.parse_args()

for suffix in ["EGRID", "INIT", "UNRST", "LOG", "PRT"]:
if os.path.exists(f"{args.ecl_base}.{suffix}"):
os.remove(f"{args.ecl_base}.{suffix}")


def _load_parameters(runpath: str) -> Tuple[int, Dict]:
"""
Internal helper function to load parameter.json files in
parallel.
Args:
runpath: Path to where the realization is run.
Returns:
Dictionary with the realizations' parameters.
"""
realization = int(re.findall(r"[0-9]+", runpath)[-2])
parameters = json.loads((pathlib.Path(runpath) / "parameters.json").read_text())

return realization, parameters["FLOWNET_PARAMETERS"]


def save_iteration_parameters():
"""
This function is called as a pre-simulation workflow in ERT, saving all
parameters of an iteration to a file.
The resulting dataframe is saved as a gzipped parquet file using a PyArrow table
and has the following format (example for 5 realizations and 2 parameters):
| index = realization | parameter 1 | parameter 2 |
|=====================|=============|=============|
| 1 | x.x | x.x |
| 3 | x.x | x.x |
| 5 | x.x | x.x |
| 4 | x.x | x.x |
| 2 | x.x | x.x |
Mind that the dataframe is not ordered.
Returns:
Nothing
"""
parser = argparse.ArgumentParser(prog="Save iteration parameters to a file.")
parser.add_argument("runpath", type=str, help="Path to the ERT runpath.")
args = parser.parse_args()
args.runpath = args.runpath.replace("%d", "*")

print("Saving ERT parameters to file...", end=" ")

iteration = int(re.findall(r"[0-9]+", sorted(glob.glob(args.runpath))[-1])[-1])
runpath_list = glob.glob(args.runpath[::-1].replace("*", str(iteration), 1)[::-1])
realizations_dict = {}

with concurrent.futures.ProcessPoolExecutor() as executor:
for result in executor.map(_load_parameters, runpath_list):
realizations_dict[result[0]] = result[1]

pd.DataFrame(
[parameters for _, parameters in realizations_dict.items()],
index=realizations_dict.keys(),
).to_parquet(
f"parameters_iteration-{iteration}.parquet.gzip",
index=True,
engine="pyarrow",
compression="gzip",
)

shutil.copyfile(
f"parameters_iteration-{iteration}.parquet.gzip",
"parameters_iteration-latest.parquet.gzip",
)
print("[Done]")
5 changes: 3 additions & 2 deletions src/flownet/ahm/_run_ahm.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,11 @@ def run_flownet_history_matching(
field_data = FlowData(
config.flownet.data_source.simulation.input_case,
layers=config.flownet.data_source.simulation.layers,
perforation_handling_strategy=config.flownet.perforation_handling_strategy,
)
df_production_data: pd.DataFrame = field_data.production
df_well_connections: pd.DataFrame = field_data.well_connections
df_well_connections: pd.DataFrame = field_data.get_well_connections(
config.flownet.perforation_handling_strategy
)

# Load log data if required
df_well_logs: Optional[pd.DataFrame] = (
Expand Down
58 changes: 9 additions & 49 deletions src/flownet/config_parser/_config_parser.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import warnings
import os
import pathlib
from typing import Dict, Optional, List, Union
from typing import Dict, Optional, List

import yaml
import configsuite
from configsuite import types, MetaKeys as MK, ConfigSuite
import pandas as pd

from ._merge_configs import merge_configs
from ._config_transformations import (
_integer_to_list,
_str_none_to_none,
_to_lower,
_to_upper,
)
from ..data.from_flow import FlowData


Expand All @@ -31,53 +37,6 @@ def create_schema(config_folder: Optional[pathlib.Path] = None) -> Dict:
"""

@configsuite.transformation_msg("Convert integer to list")
def _integer_to_list(input_data: Union[List, int]) -> List:
"""
Converts integer to list with single item.
Args:
input_data (Union[List, int]):
Returns:
The input_data. If it wasn't a list yet is will be turned into a list.
"""
if isinstance(input_data, int):
input_data = [input_data]
return input_data

@configsuite.transformation_msg("Convert 'None' to None")
def _str_none_to_none(
input_data: Union[str, int, float, None]
) -> Union[str, int, float, None]:
"""
Converts "None" to None
Args:
input_data (Union[str, int, float, None]):
Returns:
The input_data. If the input is "None" or "none" it is converted to None (str to None)
"""
if isinstance(input_data, str):
if input_data.lower() == "none":
return None

return input_data

@configsuite.transformation_msg("Convert string to lower case")
def _to_lower(input_data: Union[List[str], str]) -> Union[List[str], str]:
if isinstance(input_data, str):
return input_data.lower()

return [x.lower() for x in input_data]

@configsuite.transformation_msg("Convert string to upper case")
def _to_upper(input_data: Union[List[str], str]) -> Union[List[str], str]:
if isinstance(input_data, str):
return input_data.upper()

return [x.upper() for x in input_data]

@configsuite.transformation_msg("Convert input string to absolute path")
def _to_abs_path(path: Optional[str]) -> str:
"""
Expand Down Expand Up @@ -602,7 +561,8 @@ def _to_abs_path(path: Optional[str]) -> str:
},
MK.Transformation: _to_upper,
MK.Description: "List of accuracy metrics to be computed "
"in FlowNet analysis workflow",
"in FlowNet analysis workflow. "
"Supported metrics: MSE, RMSE, NRMSE, MAE, NMAE, R2",
},
"quantity": {
MK.Type: types.List,
Expand Down
Loading

0 comments on commit d410aa2

Please sign in to comment.