Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renaming spree #6

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
# Plugin for Conversations in OpenSearch
This repo is a WIP plugin for handling conversations in OpenSearch ([Per this RFC](https://github.com/opensearch-project/ml-commons/issues/1150)).
It consists of three components:
- A notion of conversational memory, storage, and a CRUD API. _(This is all that's in here today)_
- A series of search processors and pipelines that calls externally hosted LLMs for inference. _(This might go somewhere else)_
- A simple API that wraps those components for developers who don't want to learn the ins and outs of the most modern version of OpenSearch. _(This might also go somewhere else)_
Currently all that is in here is the CRUD API for conversational memory.

## Progress so far
Currently, we've mostly done the CRUD API for conversational memory - it's in the [conversational-memory](https://github.com/aryn-ai/conversational-opensearch/tree/conversational-memory) branch.
Currently, we've mostly done the CRUD API for conversational memory - the most advanced branch is [renaming-spree](https://github.com/aryn-ai/conversational-opensearch/tree/renaming-spree).
The search pipeline is still a work in progress, and the wrapper API is nonexistant. These parts might not go in this repo.

---
Expand All @@ -24,15 +21,15 @@ Likewise "opensearch" shouldn't be part of a plugin's name, but it's in the titl
Accordingly, all code lives somewhere in the `org.opensearch.conversational` package.

## Tests
The `conversational-memory` branch has, along with the CRUD API, numerous tests.
This repo has, along with the CRUD API, numerous tests.
I'm pretty sure they're not completely comprehensive, but they're pretty good.
Run them with

```
./gradlew check
```

The current implementation against OpenSearch 2.8.0 - there shouldn't be too much incompatibility between versions given what currently exists, but when pipelines come along that assertion will break.
The current implementation against OpenSearch 2.9.0 - there shouldn't be too much incompatibility between versions given what currently exists.



Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ validateNebulaPom.enabled = false
buildscript {

ext {
opensearch_version = System.getProperty("opensearch.version", "2.8.0")
opensearch_version = System.getProperty("opensearch.version", "2.9.0")
}

repositories {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;

import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.conversational.index.ConvoMeta;
Expand Down Expand Up @@ -51,7 +52,7 @@ public ConversationalMemoryHandler(Client client, ClusterService clusterService)
* @param listener listener to wait for this op to finish, gets unique id of new conversation
*/
public void createConversation(ActionListener<String> listener) {
convoMetaIndex.addNewConversation(listener);
convoMetaIndex.createConversation(listener);
}

/**
Expand All @@ -60,7 +61,7 @@ public void createConversation(ActionListener<String> listener) {
* @param listener listener to wait for this op to finish, gets unique id of new conversation
*/
public void createConversation(String name, ActionListener<String> listener) {
convoMetaIndex.addNewConversation(name, listener);
convoMetaIndex.createConversation(name, listener);
}

/**
Expand All @@ -73,7 +74,7 @@ public void createConversation(String name, ActionListener<String> listener) {
* @param metadata arbitrary JSON string of extra stuff
* @param listener gets the ID of the new interaction
*/
public void putInteraction(
public void createInteraction(
String conversationId,
String input,
String prompt,
Expand All @@ -84,7 +85,7 @@ public void putInteraction(
) {
Instant time = Instant.now();
convoMetaIndex.hitConversation(conversationId, time, ActionListener.wrap(r->{}, e->{}));
interactionsIndex.addInteraction(
interactionsIndex.createInteraction(
conversationId, input, prompt,
response, agent, metadata, time, listener
);
Expand All @@ -107,20 +108,37 @@ public void getInteractions(String conversationId, int from, int maxResults, Act
* @param maxResults how many conversations to list
* @param listener gets the list of all conversations, sorted by recency
*/
public void listConversations(int from, int maxResults, ActionListener<List<ConvoMeta>> listener) {
convoMetaIndex.listConversations(from, maxResults, listener);
public void getConversations(int from, int maxResults, ActionListener<List<ConvoMeta>> listener) {
convoMetaIndex.getConversations(from, maxResults, listener);
}

/**
* Get all conversations (not the interactions in them, just the headers)
* @param maxResults how many conversations to get
* @param listener receives the list of conversations, sorted by recency
*/
public void listConversations(int maxResults, ActionListener<List<ConvoMeta>> listener) {
convoMetaIndex.listConversations(maxResults, listener);
public void getConversations(int maxResults, ActionListener<List<ConvoMeta>> listener) {
convoMetaIndex.getConversations(maxResults, listener);
}

/**
* Delete a conversation and all of its interactions
* @param conversationId the id of the conversation to delete
* @param listener receives whether the convoMeta object and all of its interactions were deleted. i.e. false => there's something still in an index somewhere
*/
public void deleteConversation(String conversationId, ActionListener<Boolean> listener) {
StepListener<Boolean> metaDeleteListener = new StepListener<>();
StepListener<Boolean> interactionsListener = new StepListener<>();

convoMetaIndex.deleteConversation(conversationId, metaDeleteListener);
interactionsIndex.deleteConversation(conversationId, interactionsListener);

metaDeleteListener.whenComplete(metaResult -> {
interactionsListener.whenComplete(interactionResult -> {
listener.onResponse(metaResult && interactionResult);
}, listener::onFailure);
}, listener::onFailure);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,18 @@
import org.opensearch.conversational.action.memory.conversation.CreateConversationAction;
import org.opensearch.conversational.action.memory.conversation.CreateConversationRestAction;
import org.opensearch.conversational.action.memory.conversation.CreateConversationTransportAction;
import org.opensearch.conversational.action.memory.conversation.ListConversationsAction;
import org.opensearch.conversational.action.memory.conversation.ListConversationsRestAction;
import org.opensearch.conversational.action.memory.conversation.ListConversationsTransportAction;
import org.opensearch.conversational.action.memory.conversation.DeleteConversationAction;
import org.opensearch.conversational.action.memory.conversation.DeleteConversationRestAction;
import org.opensearch.conversational.action.memory.conversation.DeleteConversationTransportAction;
import org.opensearch.conversational.action.memory.conversation.GetConversationsAction;
import org.opensearch.conversational.action.memory.conversation.GetConversationsRestAction;
import org.opensearch.conversational.action.memory.conversation.GetConversationsTransportAction;
import org.opensearch.conversational.action.memory.interaction.GetInteractionsAction;
import org.opensearch.conversational.action.memory.interaction.GetInteractionsRestAction;
import org.opensearch.conversational.action.memory.interaction.GetInteractionsTransportAction;
import org.opensearch.conversational.action.memory.interaction.PutInteractionAction;
import org.opensearch.conversational.action.memory.interaction.PutInteractionRestAction;
import org.opensearch.conversational.action.memory.interaction.PutInteractionTransportAction;
import org.opensearch.conversational.action.memory.interaction.CreateInteractionAction;
import org.opensearch.conversational.action.memory.interaction.CreateInteractionRestAction;
import org.opensearch.conversational.action.memory.interaction.CreateInteractionTransportAction;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
Expand All @@ -69,9 +72,10 @@ public class ConversationalPlugin extends Plugin implements ActionPlugin {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of(
new ActionHandler<>(CreateConversationAction.INSTANCE, CreateConversationTransportAction.class),
new ActionHandler<>(ListConversationsAction.INSTANCE, ListConversationsTransportAction.class),
new ActionHandler<>(PutInteractionAction.INSTANCE, PutInteractionTransportAction.class),
new ActionHandler<>(GetInteractionsAction.INSTANCE, GetInteractionsTransportAction.class)
new ActionHandler<>(GetConversationsAction.INSTANCE, GetConversationsTransportAction.class),
new ActionHandler<>(CreateInteractionAction.INSTANCE, CreateInteractionTransportAction.class),
new ActionHandler<>(GetInteractionsAction.INSTANCE, GetInteractionsTransportAction.class),
new ActionHandler<>(DeleteConversationAction.INSTANCE, DeleteConversationTransportAction.class)
);
}

Expand Down Expand Up @@ -108,14 +112,16 @@ public List<RestHandler> getRestHandlers(
Supplier<DiscoveryNodes> nodesInCluster
) {
CreateConversationRestAction restCreateConversation = new CreateConversationRestAction();
ListConversationsRestAction restListConversations = new ListConversationsRestAction();
PutInteractionRestAction restCreateInteraction = new PutInteractionRestAction();
GetConversationsRestAction restListConversations = new GetConversationsRestAction();
CreateInteractionRestAction restCreateInteraction = new CreateInteractionRestAction();
GetInteractionsRestAction restListInteractions = new GetInteractionsRestAction();
DeleteConversationRestAction restDeleteConversation = new DeleteConversationRestAction();
return List.of(
restCreateConversation,
restListConversations,
restCreateInteraction,
restListInteractions
restListInteractions,
restDeleteConversation
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class ActionConstants {
public final static String AI_AGENT_FIELD = "agent";
/** name of interaction attributes field in all requests */
public final static String INTER_ATTRIBUTES_FIELD = "attributes";
/** name of success field in all requests */
public final static String SUCCESS_FIELD = "success";

/** path for create conversation */
public final static String CREATE_CONVERSATION_PATH = "/_plugins/conversational/memory";
Expand All @@ -57,6 +59,8 @@ public class ActionConstants {
public final static String CREATE_INTERACTION_PATH = "/_plugins/conversational/memory/{conversationId}";
/** path for get interactions */
public final static String GET_INTERACTIONS_PATH = "/_plugins/conversational/memory/{conversationId}";
/** path for delete conversation */
public final static String DELETE_CONVERSATION_PATH = "/_plugins/conversational/memory/{conversationId}";

/** default max results returned by get operations */
public final static int DEFAULT_MAX_RESULTS = 10;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Aryn, Inc 2023
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opensearch.conversational.action.memory.conversation;

import org.opensearch.action.ActionType;

/**
* Action for deleting a conversation from conversational memory
*/
public class DeleteConversationAction extends ActionType<DeleteConversationResponse> {
/** Instance of this */
public static final DeleteConversationAction INSTANCE = new DeleteConversationAction();
/** Name of this action - has something to do with security maybe */
public static final String NAME = "cluster:admin/opensearch/conversational/conversation/delete";

private DeleteConversationAction() {super(NAME, DeleteConversationResponse::new);}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright Aryn, Inc 2023
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opensearch.conversational.action.memory.conversation;

import java.io.IOException;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.conversational.action.ActionConstants;
import org.opensearch.rest.RestRequest;

import static org.opensearch.action.ValidateActions.addValidationError;

/**
* Action Request for Delete Conversation
*/
public class DeleteConversationRequest extends ActionRequest {
private String conversationId;

/**
* Constructor
* @param in input stream, assumes one of these requests was written to it
* @throws IOException if something breaks
*/
public DeleteConversationRequest(StreamInput in) throws IOException {
super(in);
this.conversationId = in.readString();
}

/**
* Constructor
* @param conversationId id of the conversation to be deleted
*/
public DeleteConversationRequest(String conversationId) {
this.conversationId = conversationId;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(conversationId);
}

/**
* Get the conversation id of the conversation to be deleted
* @return the id of the conversation to be deleted
*/
public String getId() {
return conversationId;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException exception = null;
if (conversationId == null) {
exception = addValidationError("conversation id must not be null", exception);
}
return exception;
}

/**
* Create a new DeleteConversationRequest from a RestRequest
* @param request RestRequest representing a DeleteConversationRequest
* @return a new DeleteConversationRequest
* @throws IOException if something breaks
*/
public static DeleteConversationRequest fromRestRequest(RestRequest request) throws IOException {
String cid = request.param(ActionConstants.CONVO_ID_FIELD);
return new DeleteConversationRequest(cid);
}

}
Loading
Loading