Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implemented index alias to make import atomic #46

Merged
merged 4 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 71 additions & 71 deletions scripts/import-channel
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ click_log.basic_config(logger)


CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))


INDEX_SCHEMA_VERSION = 1
ANALYSIS = {
"analyzer": {
"nixAttrName": {
Expand Down Expand Up @@ -70,6 +69,43 @@ ANALYSIS = {
},
},
}
PACKAGES_MAPPING = dict(
properties=dict(
attr_name=dict(
type="text", analyzer="nixAttrName", fields={"raw": {"type": "keyword"}},
),
attr_set=dict(type="keyword"),
pname=dict(type="keyword"),
pversion=dict(type="keyword"),
description=dict(type="text"),
longDescription=dict(type="text"),
license=dict(
type="nested",
properties=dict(fullName=dict(type="text"), url=dict(type="text"),),
),
maintainers=dict(
type="nested",
properties=dict(
name=dict(type="text"),
email=dict(type="text"),
github=dict(type="text"),
),
),
platforms=dict(type="keyword"),
position=dict(type="text"),
homepage=dict(type="keyword"),
),
)
OPTIONS_MAPPING = dict(
properties=dict(
option_name=dict(type="keyword"),
description=dict(type="text"),
type=dict(type="keyword"),
default=dict(type="text"),
example=dict(type="text"),
source=dict(type="keyword"),
),
)


def get_last_evaluation(channel):
Expand Down Expand Up @@ -235,75 +271,30 @@ def get_options(evaluation):
return len(options), gen


def recreate_index(es, channel):
packages_index = f"{channel}-packages"
if es.indices.exists(packages_index):
es.indices.delete(index=packages_index)
logger.debug(
f"recreate_index: index '{packages_index}' already exists and was deleted"
)
def create_index(es, index, mapping):
if es.indices.exists(index):
logger.debug(f"create_index: index '{index}' already exists")
return
es.indices.create(
index=packages_index,
body=dict(
settings=dict(number_of_shards=1, analysis=ANALYSIS),
mappings=dict(
properties=dict(
attr_name=dict(
type="text",
analyzer="nixAttrName",
fields={"raw": {"type": "keyword"}},
),
attr_set=dict(type="keyword"),
pname=dict(type="keyword"),
pversion=dict(type="keyword"),
description=dict(type="text"),
longDescription=dict(type="text"),
license=dict(
type="nested",
properties=dict(
fullName=dict(type="text"), url=dict(type="text"),
),
),
maintainers=dict(
type="nested",
properties=dict(
name=dict(type="text"),
email=dict(type="text"),
github=dict(type="text"),
),
),
platforms=dict(type="keyword"),
position=dict(type="text"),
homepage=dict(type="keyword"),
),
),
),
index=index,
body={
"settings": {"number_of_shards": 1, "analysis": ANALYSIS},
"mappings": mapping,
},
)
logger.debug(f"recreate_index: index '{packages_index}' was created")

options_index = f"{channel}-options"
if es.indices.exists(options_index):
es.indices.delete(index=options_index)
logger.debug(
f"recreate_index: index '{options_index}' already exists and was deleted"
)
es.indices.create(
index=options_index,
body=dict(
settings=dict(number_of_shards=1, analysis=ANALYSIS),
mappings=dict(
properties=dict(
option_name=dict(type="keyword"),
description=dict(type="text"),
type=dict(type="keyword"),
default=dict(type="text"),
example=dict(type="text"),
source=dict(type="keyword"),
),
),
),
logger.debug(f"create_index: index '{index}' was created")


def create_index_name(type_, channel, evaluation):
return (
f"latest-{channel}-{type_}",
f"evaluation-{INDEX_SCHEMA_VERSION}-{channel}-{evaluation['revisions_since_start']}-{evaluation['git_revision']}-{type_}",
)
logger.debug(f"recreate_index: index '{options_index}' was created")


def update_alias(es, name, index):
es.indices.put_alias(index=index, name=name)
logger.debug(f"'{name}' alias now points to '{index}' index")


@click.command()
Expand All @@ -324,7 +315,12 @@ def main(es_url, channel, verbose):

evaluation = get_last_evaluation(channel)
es = elasticsearch.Elasticsearch([es_url])
recreate_index(es, channel)

# ensure indexes exist
packages_alias, packages_index = create_index_name("packages", channel, evaluation)
options_alias, options_index = create_index_name("options", channel, evaluation)
create_index(es, packages_index, PACKAGES_MAPPING)
create_index(es, options_index, OPTIONS_MAPPING)

# write packages
number_of_packages, gen_packages = get_packages(evaluation)
Expand All @@ -333,7 +329,7 @@ def main(es_url, channel, verbose):
progress = tqdm.tqdm(unit="packages", total=number_of_packages)
successes = 0
for ok, action in elasticsearch.helpers.streaming_bulk(
client=es, index=f"{channel}-packages", actions=gen_packages()
client=es, index=packages_index, actions=gen_packages()
):
progress.update(1)
successes += ok
Expand All @@ -346,12 +342,16 @@ def main(es_url, channel, verbose):
progress = tqdm.tqdm(unit="options", total=number_of_options)
successes = 0
for ok, action in elasticsearch.helpers.streaming_bulk(
client=es, index=f"{channel}-options", actions=gen_options()
client=es, index=options_index, actions=gen_options()
):
progress.update(1)
successes += ok
print("Indexed %d/%d options" % (successes, number_of_options))

# update alias
update_alias(es, packages_alias, packages_index)
update_alias(es, options_alias, options_index)


if __name__ == "__main__":
main()
Expand Down
2 changes: 1 addition & 1 deletion src/Page/Options.elm
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ makeRequest :
makeRequest options channel query from size =
ElasticSearch.makeRequest
"option_name"
("nixos-" ++ channel ++ "-options")
("latest-nixos-" ++ channel ++ "-options")
decodeResultItemSource
options
query
Expand Down
2 changes: 1 addition & 1 deletion src/Page/Packages.elm
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ makeRequest :
makeRequest options channel query from size =
ElasticSearch.makeRequest
"attr_name"
("nixos-" ++ channel ++ "-packages")
("latest-nixos-" ++ channel ++ "-packages")
decodeResultItemSource
options
query
Expand Down