Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remove_query rpc #155

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions noria-server/src/bin/zk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,41 @@ fn main() {
.required_unless("clean")
.help("Print current configuration to stdout."),
)
.arg(
Arg::with_name("skip-controller")
.long("--skip-controller")
.takes_value(false)
.help("Don't print current controller."),
)
.get_matches();

let deployment = matches.value_of("deployment").unwrap();
let zookeeper_addr = format!("{}/{}", matches.value_of("zookeeper").unwrap(), deployment);
let clean = matches.is_present("clean");
let dump = matches.is_present("show");
let skip_controller = matches.is_present("skip-controller");

let zk = ZooKeeper::connect(&zookeeper_addr, Duration::from_secs(1), EventWatcher).unwrap();

if dump {
let (ref current_ctrl, ref _stat) = match zk.get_data(CONTROLLER_KEY, false) {
Ok(data) => data,
Err(e) => match e {
ZkError::NoNode => {
println!("no current Soup controller in Zookeeper!");
return;
}
_ => panic!("{:?}", e),
},
};
if !skip_controller {
let (ref current_ctrl, ref _stat) = match zk.get_data(CONTROLLER_KEY, false) {
Ok(data) => data,
Err(e) => match e {
ZkError::NoNode => {
println!("no current Soup controller in Zookeeper!");
return;
}
_ => panic!("{:?}", e),
},
};

let controller: Value = serde_json::from_slice(current_ctrl).unwrap();
println!(
"Current Soup controller in Zookeeper:\n{}\n\n",
serde_json::to_string_pretty(&controller).unwrap()
);
}

let (ref current_data, ref _stat) = match zk.get_data(STATE_KEY, false) {
Ok(data) => data,
Expand All @@ -89,12 +104,6 @@ fn main() {
},
};

let controller: Value = serde_json::from_slice(current_ctrl).unwrap();
println!(
"Current Soup controller in Zookeeper:\n{}\n\n",
serde_json::to_string_pretty(&controller).unwrap()
);

let state: Value = serde_json::from_slice(current_data).unwrap();
println!(
"Current Soup configuration in Zookeeper:\n{}",
Expand Down
56 changes: 53 additions & 3 deletions noria-server/src/controller/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,12 @@ impl ControllerInner {
self.remove_nodes(vec![args].as_slice())
.map(|r| json::to_string(&r).unwrap())
}),
(Method::POST, "/remove_query") => json::from_slice(&body)
.map_err(|_| StatusCode::BAD_REQUEST)
.map(|args| {
self.remove_query(authority, args)
.map(|r| json::to_string(&r).unwrap())
}),
_ => Err(StatusCode::NOT_FOUND),
}
}
Expand Down Expand Up @@ -1179,13 +1185,15 @@ impl ControllerInner {
// This query leaf node has children -- typically, these are readers, but they can also
// include egress nodes or other, dependent queries.
let mut has_non_reader_children = false;
let mut non_readers = Vec::default();
let readers: Vec<_> = self
.ingredients
.neighbors_directed(leaf, petgraph::EdgeDirection::Outgoing)
.filter(|ni| {
if self.ingredients[*ni].is_reader() {
true
} else {
non_readers.push(*ni);
has_non_reader_children = true;
false
}
Expand All @@ -1198,8 +1206,22 @@ impl ControllerInner {
"not removing node {} yet, as it still has non-reader children",
leaf.index()
);
unreachable!();
// deleting non-reader and its children
for nr in non_readers {
if !self.ingredients[nr].is_base() {
let mut children = Vec::default();
self.ingredients
.neighbors_directed(nr, petgraph::EdgeDirection::Outgoing)
.for_each(|ni| children.push(ni));
for child in children {
self.remove_leaf(child);
}
self.remove_leaf(nr);
}
}
//unreachable;
}

// nodes can have only one reader attached
assert!(readers.len() <= 1);
debug!(
Expand All @@ -1211,7 +1233,8 @@ impl ControllerInner {
removals.push(readers[0]);
leaf = readers[0];
} else {
unreachable!();
//unreachable!();
//
}
}

Expand Down Expand Up @@ -1242,7 +1265,7 @@ impl ControllerInner {
.neighbors_directed(parent, petgraph::EdgeDirection::Outgoing)
.count() == 0
{
nodes.push(parent);
// nodes.push(parent);
}
}

Expand Down Expand Up @@ -1349,6 +1372,33 @@ impl ControllerInner {
})
}
}
fn remove_query<A: Authority + 'static>(
&mut self,
authority: &Arc<A>,
qname: &str,
) -> Result<(), String> {
let old = self.recipe.clone();
self.recipe.remove_query(qname);
let updated = self.recipe.clone();
let replaced = old.replace(updated).unwrap();

let activation_result = self.apply_recipe(replaced);
if authority
.read_modify_write(STATE_KEY, |state: Option<ControllerState>| match state {
None => unreachable!(),
Some(ref state) if state.epoch > self.epoch => Err(()),
Some(mut state) => {
state.recipe_version = self.recipe.version();
// state.recipes.push(updated_recipe.clone());
Ok(state)
}
})
.is_err()
{
return Err("Failed to persist recipe extension".to_owned());
}
Ok(())
}
}

impl Drop for ControllerInner {
Expand Down
18 changes: 10 additions & 8 deletions noria-server/src/controller/recipe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ impl Recipe {
// them twice.
self.inc.as_mut().unwrap().remove_base(&ctq.table.name);
match self.prior.as_ref().unwrap().node_addr_for(&ctq.table.name) {
Ok(ni) => Some(ni),
Ok(ni) => Some(vec![ni]),
Err(e) => {
crit!(
self.log,
Expand All @@ -473,15 +473,17 @@ impl Recipe {
}
}
}
_ => self
.inc
.as_mut()
.unwrap()
.remove_query(n.as_ref().unwrap(), mig),
_ => {
self
.inc
.as_mut()
.unwrap()
.remove_query(n.as_ref().unwrap(), mig)
}
}
})
.flatten()
.collect();

Ok(result)
}

Expand Down Expand Up @@ -650,7 +652,7 @@ impl Recipe {
self.prior.as_ref().map(|p| &**p)
}

fn remove_query(&mut self, qname: &str) -> bool {
crate fn remove_query(&mut self, qname: &str) -> bool {
let qid = self.aliases.get(qname).cloned();
if qid.is_none() {
warn!(self.log, "Query {} not found in expressions", qname);
Expand Down
41 changes: 32 additions & 9 deletions noria-server/src/controller/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ crate struct SqlIncorporator {
/// Active universes mapped to the group they belong to.
/// If an user universe, mapped to None.
universes: HashMap<Option<DataType>, Vec<UniverseId>>,

compound_mir_queries: HashMap<String, MirQuery>,
}

impl Default for SqlIncorporator {
Expand All @@ -87,6 +89,7 @@ impl Default for SqlIncorporator {

reuse_type: ReuseConfigType::Finkelstein,
universes: HashMap::default(),
compound_mir_queries: HashMap::default(),
}
}
}
Expand Down Expand Up @@ -451,10 +454,17 @@ impl SqlIncorporator {
.iter()
.enumerate()
.map(|(i, sq)| {
Ok(self
.add_select_query(&format!("{}_csq_{}", query_name, i), &sq.1, false, mig)?
.1
.unwrap())
let q_name = &format!("{}_csq_{}", query_name, i);
let res = self.add_select_query(q_name, &sq.1, false, mig)?.1;
if res.is_none() {
let hash = self.named_queries[q_name].clone();
let query = self.mir_queries[&(hash, mig.universe())].clone();
debug!(self.log, "Fetched a mir_query from before {:?}", query);
Ok(query)
} else {
Ok(res.unwrap())
}

})
.collect();

Expand All @@ -468,6 +478,7 @@ impl SqlIncorporator {
);

let qfp = mir_query_to_flow_parts(&mut combined_mir_query, &mut mig, None);
self.compound_mir_queries.insert(query_name.to_owned(), combined_mir_query.clone());

self.register_query(query_name, None, &combined_mir_query, mig.universe());

Expand Down Expand Up @@ -562,12 +573,21 @@ impl SqlIncorporator {
Ok((qfp, mir))
}

pub(super) fn remove_query(&mut self, query_name: &str, mig: &Migration) -> Option<NodeIndex> {
pub(super) fn remove_query(&mut self, query_name: &str, mig: &Migration) -> Option<Vec<NodeIndex>> {
let nodeid = self
.leaf_addresses
.remove(query_name)
.expect("tried to remove unknown query");

if self.compound_mir_queries.contains_key(query_name) {
let mir_query = self.compound_mir_queries.get(query_name).unwrap().clone();
self.compound_mir_queries.remove_entry(query_name);
self.view_schemas.remove(query_name).unwrap();
// potentially would need to remove subqueries from named_queries
self.mir_converter.remove_query(query_name, &mir_query);
return Some(vec![nodeid])
}

let qg_hash = self
.named_queries
.remove(query_name)
Expand All @@ -594,8 +614,8 @@ impl SqlIncorporator {
self.query_graphs.remove(&qg_hash).unwrap();
self.view_schemas.remove(query_name).unwrap();

// trigger reader node removal
Some(nodeid)
// the node has been removed
Some(vec![nodeid])
} else {
// more than one query uses this leaf
// don't remove node yet!
Expand Down Expand Up @@ -660,8 +680,11 @@ impl SqlIncorporator {
self.named_queries.insert(query_name.to_owned(), qg_hash);
}
None => {
self.base_mir_queries
.insert(query_name.to_owned(), mir.clone());
if self.compound_mir_queries.contains_key(query_name) {
debug!(self.log, "Dealing with a compound query {:?}", query_name.to_owned());
} else {
self.base_mir_queries.insert(query_name.to_owned(), mir.clone());
}
}
}
}
Expand Down
35 changes: 35 additions & 0 deletions noria-server/src/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2514,3 +2514,38 @@ fn correct_nested_view_schema() {
];
assert_eq!(q.schema(), Some(&expected_schema[..]));
}

#[test]
fn remove_compound_query() {
let mut g = start_simple("remove_compound_query");
let sql = "
CREATE TABLE answers_a (email_key int, lec int, answer text, PRIMARY KEY (email_key));\
CREATE TABLE answers_b (email_key int, lec int, answer text, PRIMARY KEY (email_key));\
QUERY answers: SELECT email_key, answer FROM answers_a WHERE lec=? UNION SELECT email_key, answer FROM answers_b WHERE lec=?;
";
g.install_recipe(sql).unwrap();
let mut write = g.table("answers_a").unwrap().into_sync();
let mut write2 = g.table("answers_b").unwrap().into_sync();
// insert a new record
write.insert(vec![1.into(), 3.into(), "hello".into()]).unwrap();
write2.insert(vec![2.into(), 3.into(), "goodbye".into()]).unwrap();

g.remove_query("answers");
assert_eq!(g.outputs().unwrap().len(), 0);

let r1_txt = "\
CREATE TABLE answers_c (email_key int, lec int, answer text, PRIMARY KEY (lec));\
QUERY answers: SELECT email_key, answer FROM answers_a WHERE lec=? UNION SELECT email_key, answer FROM answers_b WHERE lec=? UNION SELECT email_key, answer FROM answers_c WHERE lec=?;\
";
g.extend_recipe(r1_txt).unwrap();
assert_eq!(g.outputs().unwrap().len(), 1);
g.remove_query("answers");

let r2_txt = "\
CREATE TABLE answers_d (email_key int, lec int, answer text, PRIMARY KEY (lec));\
QUERY answers: SELECT email_key, answer FROM answers_a WHERE lec=? UNION SELECT email_key, answer FROM answers_b WHERE lec=? UNION SELECT email_key, answer FROM answers_c WHERE lec=? UNION SELECT email_key, answer FROM answers_d WHERE lec=?;\
";
g.extend_recipe(r2_txt).unwrap();
assert_eq!(g.outputs().unwrap().len(), 1);

}
17 changes: 17 additions & 0 deletions noria/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,16 @@ impl<A: Authority + 'static> ControllerHandle<A> {
self.rpc("remove_node", view, "failed to remove node")
}

/// Remove the given query view from the graph.
///
/// `Self::poll_ready` must have returned `Async::Ready` before you call this method.
pub fn remove_query(
&mut self,
view: &str,
) -> impl Future<Item = (), Error = failure::Error> + Send {
self.rpc("remove_query", view, "failed to remove query")
}

/// Construct a synchronous interface to this controller instance using the given executor to
/// execute all operations.
///
Expand Down Expand Up @@ -636,4 +646,11 @@ where
let fut = self.handle()?.remove_node(view);
self.run(fut)
}
/// Remove the given external view from the graph.
///
/// See [`ControllerHandle::remove_node`].
pub fn remove_query(&mut self, view: &str) -> Result<(), failure::Error> {
let fut = self.handle()?.remove_query(view);
self.run(fut)
}
}