diff --git a/requirements.txt b/requirements.txt index 925cf46b5..59c012741 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ aiohttp -BingImageCreator +BingImageCreator-plus certifi httpx prompt_toolkit requests rich +Brotli \ No newline at end of file diff --git a/run.py b/run.py new file mode 100644 index 000000000..8a0503368 --- /dev/null +++ b/run.py @@ -0,0 +1,28 @@ +import asyncio +import json + +from src.EdgeGPT.EdgeGPT import Chatbot, ConversationStyle + + +async def async_main() -> None: + """ + Main function + """ + print("Initializing...") + print("Enter `alt+enter` or `escape+enter` to send a message") + # Read and parse cookies + cookies = json.loads( + open("C:\\Users\\Codete\\.config\\bing-cookies.json", encoding="utf-8").read() + ) + bot = await Chatbot.create(cookies=cookies) + response = await bot.ask( + prompt="please describe the picture", + conversation_style=ConversationStyle.balanced, + simplify_response=False, + attachment="C:\\Users\\Codete\\OneDrive\\Pictures\\Ffrc-8-XoAcItDo.jpg", + ) + await bot.close() + print(json.dumps(response, indent=2)) + + +asyncio.run(async_main()) diff --git a/setup.py b/setup.py index 08b012e97..c1a840b6a 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,6 @@ from pathlib import Path -from setuptools import find_packages -from setuptools import setup +from setuptools import find_packages, setup DOCS_PATH = Path(__file__).parents[0] / "docs/README.md" PATH = Path("README.md") @@ -11,16 +10,16 @@ f2.write(f1.read()) setup( - name="EdgeGPT", - version="0.13.2", + name="EdgeGPT-plus", + version="0.13.6", license="The Unlicense", - author="Antonio Cheong", + author="Antonio Cheong + others", author_email="acheong@student.dalat.org", description="Reverse engineered Edge Chat API", packages=find_packages("src"), package_dir={"": "src"}, - url="https://github.com/acheong08/EdgeGPT", - project_urls={"Bug Report": "https://github.com/acheong08/EdgeGPT/issues/new"}, + url="https://github.com/brainboost/EdgeGPT", + project_urls={"Bug Report": "https://github.com/brainboost/EdgeGPT/issues/new"}, entry_points={ "console_scripts": [ "edge-gpt = EdgeGPT.main:main", @@ -35,7 +34,7 @@ "certifi", "prompt_toolkit", "requests", - "BingImageCreator>=0.4.4", + "BingImageCreator-plus>=0.4.4", ], long_description=Path.open(PATH, encoding="utf-8").read(), long_description_content_type="text/markdown", diff --git a/src/EdgeGPT/EdgeGPT.py b/src/EdgeGPT/EdgeGPT.py index 96564f1c6..64544079a 100644 --- a/src/EdgeGPT/EdgeGPT.py +++ b/src/EdgeGPT/EdgeGPT.py @@ -4,7 +4,6 @@ from __future__ import annotations import json -from pathlib import Path from typing import Generator from .chathub import * @@ -52,6 +51,9 @@ async def save_conversation(self, filename: str) -> None: with open(filename, "w") as f: conversation_id = self.chat_hub.request.conversation_id conversation_signature = self.chat_hub.request.conversation_signature + encrypted_conversation_signature = ( + self.chat_hub.request.encrypted_conversation_signature + ) client_id = self.chat_hub.request.client_id invocation_id = self.chat_hub.request.invocation_id f.write( @@ -59,6 +61,7 @@ async def save_conversation(self, filename: str) -> None: { "conversation_id": conversation_id, "conversation_signature": conversation_signature, + "encrypted_conversation_signature": encrypted_conversation_signature, "client_id": client_id, "invocation_id": invocation_id, }, @@ -73,6 +76,9 @@ async def load_conversation(self, filename: str) -> None: conversation = json.load(f) self.chat_hub.request = ChatHubRequest( conversation_signature=conversation["conversation_signature"], + encrypted_conversation_signature=conversation[ + "encrypted_conversation_signature" + ], client_id=conversation["client_id"], conversation_id=conversation["conversation_id"], invocation_id=conversation["invocation_id"], @@ -93,12 +99,14 @@ async def get_activity(self) -> dict: async def ask( self, prompt: str, - wss_link: str = "wss://sydney.bing.com/sydney/ChatHub", conversation_style: CONVERSATION_STYLE_TYPE = None, webpage_context: str | None = None, search_result: bool = False, locale: str = guess_locale(), simplify_response: bool = False, + attachment: str | None = None, + mode: str = None, + no_search: bool = True, ) -> dict: """ Ask a question to the bot @@ -116,10 +124,12 @@ async def ask( async for final, response in self.chat_hub.ask_stream( prompt=prompt, conversation_style=conversation_style, - wss_link=wss_link, webpage_context=webpage_context, search_result=search_result, locale=locale, + attachment=attachment, + mode=mode, + no_search=no_search, ): if final: if not simplify_response: @@ -132,13 +142,19 @@ async def ask( ) if messages_left == 0: raise Exception("Max messages reached") - message = "" + message: dict | None = None + response_text = "" for msg in reversed(response["item"]["messages"]): - if msg.get("adaptiveCards") and msg["adaptiveCards"][0]["body"][ - 0 - ].get("text"): - message = msg - break + if msg.get("adaptiveCards"): + body = msg["adaptiveCards"][0]["body"] + for block in body: + if ( + block.get("type") == "TextBlock" + ): # the first block could be type: Image + response_text = block.get("text") + if response_text: + message = msg + break if not message: raise Exception("No message found") suggestions = [ @@ -146,12 +162,8 @@ async def ask( for suggestion in message.get("suggestedResponses", []) ] adaptive_cards = message.get("adaptiveCards", []) - adaptive_text = ( - adaptive_cards[0]["body"][0].get("text") if adaptive_cards else None - ) - sources = ( - adaptive_cards[0]["body"][0].get("text") if adaptive_cards else None - ) + adaptive_text = response_text if adaptive_cards else None + sources = response_text if adaptive_cards else None sources_text = ( adaptive_cards[0]["body"][-1].get("text") if adaptive_cards @@ -174,12 +186,14 @@ async def ask( async def ask_stream( self, prompt: str, - wss_link: str = "wss://sydney.bing.com/sydney/ChatHub", conversation_style: CONVERSATION_STYLE_TYPE = None, raw: bool = False, webpage_context: str | None = None, search_result: bool = False, locale: str = guess_locale(), + attachment: str | None = None, + mode: str = None, + no_search: bool = True, ) -> Generator[bool, dict | str, None]: """ Ask a question to the bot @@ -192,6 +206,9 @@ async def ask_stream( webpage_context=webpage_context, search_result=search_result, locale=locale, + attachment=attachment, + mode=mode, + no_search=no_search, ): yield response @@ -205,6 +222,7 @@ async def delete_conversation( self, conversation_id: str = None, conversation_signature: str = None, + encrypted_conversation_signature: str = None, client_id: str = None, ) -> None: """ @@ -213,6 +231,7 @@ async def delete_conversation( await self.chat_hub.delete_conversation( conversation_id=conversation_id, conversation_signature=conversation_signature, + encrypted_conversation_signature=encrypted_conversation_signature, client_id=client_id, ) diff --git a/src/EdgeGPT/EdgeUtils.py b/src/EdgeGPT/EdgeUtils.py index 09dcf7f5c..a5969ebe6 100644 --- a/src/EdgeGPT/EdgeUtils.py +++ b/src/EdgeGPT/EdgeUtils.py @@ -6,8 +6,7 @@ from log2d import Log -from .EdgeGPT import Chatbot -from .EdgeGPT import ConversationStyle +from .EdgeGPT import Chatbot, ConversationStyle from .ImageGen import ImageGen Log("BingChat") @@ -148,7 +147,7 @@ def __init__( if isinstance(x, (str, Path)) and x } Cookie.supplied_files = cookie_files - files = Cookie.files() # includes .supplied_files + Cookie.files() # includes .supplied_files if Cookie.rotate_cookies: Cookie.import_next() else: @@ -348,7 +347,7 @@ def test_cookie_rotation() -> None: log(f"Cookie count: {Cookie.request_count.get(Cookie.current_file_path.name)}") -def test_features() -> Query: +def test_features(i) -> Query: try: q = Query( f"What is {i} in Roman numerals? Give the answer in JSON", diff --git a/src/EdgeGPT/chathub.py b/src/EdgeGPT/chathub.py index 3f5a92e5e..2b04494c7 100644 --- a/src/EdgeGPT/chathub.py +++ b/src/EdgeGPT/chathub.py @@ -3,25 +3,22 @@ import os import ssl import sys +import urllib.parse +from base64 import b64encode from time import time -from typing import Generator -from typing import List -from typing import Union +from typing import Generator, List, Union import aiohttp import certifi import httpx from BingImageCreator import ImageGenAsync +from curl_cffi import requests -from .constants import DELIMITER -from .constants import HEADERS -from .constants import HEADERS_INIT_CONVER +from .constants import DELIMITER, HEADERS, HEADERS_INIT_CONVER, KBLOB_HEADERS from .conversation import Conversation from .conversation_style import CONVERSATION_STYLE_TYPE from .request import ChatHubRequest -from .utilities import append_identifier -from .utilities import get_ran_hex -from .utilities import guess_locale +from .utilities import append_identifier, cookies_to_dict, guess_locale ssl_context = ssl.create_default_context() ssl_context.load_verify_locations(certifi.where()) @@ -40,6 +37,9 @@ def __init__( self.task: asyncio.Task self.request = ChatHubRequest( conversation_signature=conversation.struct["conversationSignature"], + encrypted_conversation_signature=conversation.struct[ + "encryptedConversationSignature" + ], client_id=conversation.struct["clientId"], conversation_id=conversation.struct["conversationId"], ) @@ -60,21 +60,12 @@ def __init__( timeout=900, headers=HEADERS_INIT_CONVER, ) - - async def get_conversation( - self, - conversation_id: str = None, - conversation_signature: str = None, - client_id: str = None, - ) -> dict: - conversation_id = conversation_id or self.request.conversation_id - conversation_signature = ( - conversation_signature or self.request.conversation_signature - ) - client_id = client_id or self.request.client_id - url = f"https://sydney.bing.com/sydney/GetConversation?conversationId={conversation_id}&source=cib&participantId={client_id}&conversationSignature={conversation_signature}&traceId={get_ran_hex()}" - response = await self.session.get(url) - return response.json() + if conversation.struct.get("encryptedConversationSignature"): + self.encrypted_conversation_signature = conversation.struct[ + "encryptedConversationSignature" + ] + else: + self.encrypted_conversation_signature = None async def get_activity(self) -> dict: url = "https://www.bing.com/turing/conversation/chats" @@ -96,21 +87,29 @@ async def ask_stream( webpage_context: Union[str, None] = None, search_result: bool = False, locale: str = guess_locale(), + attachment: Union[str, None] = None, + mode: str = None, + no_search: bool = True, ) -> Generator[bool, Union[dict, str], None]: """ """ - cookies = {} - if self.cookies is not None: - for cookie in self.cookies: - cookies[cookie["name"]] = cookie["value"] - self.aio_session = aiohttp.ClientSession(cookies=cookies) + attachment_info = None + if attachment: + attachment_info = await self.upload_attachment(attachment) + + if self.request.encrypted_conversation_signature is not None: + wss_link = wss_link or "wss://sydney.bing.com/sydney/ChatHub" + wss_link += f"?sec_access_token={urllib.parse.quote(self.request.encrypted_conversation_signature)}" + self.aio_session = aiohttp.ClientSession(cookies=cookies_to_dict(self.cookies)) # Check if websocket is closed wss = await self.aio_session.ws_connect( wss_link or "wss://sydney.bing.com/sydney/ChatHub", ssl=ssl_context, headers=HEADERS, proxy=self.proxy, + timeout=30.0, ) await self._initial_handshake(wss) + # Construct a ChatHub request self.request.update( prompt=prompt, @@ -118,6 +117,9 @@ async def ask_stream( webpage_context=webpage_context, search_result=search_result, locale=locale, + attachment_info=attachment_info, + mode=mode, + no_search=no_search, ) # Send request await wss.send_str(append_identifier(self.request.struct)) @@ -143,101 +145,111 @@ async def ask_stream( if obj is None or not obj: continue response = json.loads(obj) - # print(response) - if response.get("type") == 1 and response["arguments"][0].get( - "messages", - ): - if not draw: - if ( - response["arguments"][0]["messages"][0].get( - "messageType", - ) - == "GenerateContentQuery" - ): - try: - async with ImageGenAsync( - all_cookies=self.cookies, - ) as image_generator: - images = await image_generator.get_images( - response["arguments"][0]["messages"][0]["text"], - ) - for i, image in enumerate(images): - resp_txt = f"{resp_txt}\n![image{i}]({image})" - draw = True - except Exception as e: - print(e) - continue - if ( - ( - response["arguments"][0]["messages"][0]["contentOrigin"] - != "Apology" + try: + if response.get("type") == 1 and response["arguments"][0].get( + "messages", + ): + if not draw: + msg_type = response["arguments"][0]["messages"][0].get( + "messageType" ) - and not draw - and not raw - ): - resp_txt = result_text + response["arguments"][0][ - "messages" - ][0]["adaptiveCards"][0]["body"][0].get("text", "") - resp_txt_no_link = result_text + response["arguments"][0][ - "messages" - ][0].get("text", "") - if response["arguments"][0]["messages"][0].get( - "messageType", - ): - resp_txt = ( - resp_txt - + response["arguments"][0]["messages"][0][ - "adaptiveCards" - ][0]["body"][0]["inlines"][0].get("text") - + "\n" + if msg_type == "GenerateContentQuery": + try: + async with ImageGenAsync( + all_cookies=self.cookies, + ) as image_generator: + images = await image_generator.get_images( + response["arguments"][0]["messages"][0][ + "text" + ], + ) + for i, image in enumerate(images): + resp_txt = f"{resp_txt}\n![image{i}]({image})" + draw = True + except Exception as e: + print(e) + continue + if ( + ( + response["arguments"][0]["messages"][0][ + "contentOrigin" + ] + != "Apology" ) - result_text = ( - result_text - + response["arguments"][0]["messages"][0][ - "adaptiveCards" - ][0]["body"][0]["inlines"][0].get("text") - + "\n" + and ( + response["arguments"][0]["messages"][0].get( + "messageType" + ) + != "AdsQuery" ) + and not draw + and not raw + ): + body = response["arguments"][0]["messages"][0][ + "adaptiveCards" + ][0]["body"][0] + resp_txt = result_text + body.get("text", "") + resp_txt_no_link = result_text + response["arguments"][ + 0 + ]["messages"][0].get("text", "") + if msg_type and "inlines" in body: + resp_txt = ( + resp_txt + body["inlines"][0].get("text") + "\n" + ) + result_text = ( + result_text + + response["arguments"][0]["messages"][0][ + "adaptiveCards" + ][0]["body"][0]["inlines"][0].get("text") + + "\n" + ) if not raw: yield False, resp_txt - elif response.get("type") == 2: - if response["item"]["result"].get("error"): - await self.close() - raise Exception( - f"{response['item']['result']['value']}: {response['item']['result']['message']}", - ) - if draw: - cache = response["item"]["messages"][1]["adaptiveCards"][0][ - "body" - ][0]["text"] - response["item"]["messages"][1]["adaptiveCards"][0]["body"][0][ - "text" - ] = (cache + resp_txt) - if ( - response["item"]["messages"][-1]["contentOrigin"] == "Apology" - and resp_txt - ): - response["item"]["messages"][-1]["text"] = resp_txt_no_link - response["item"]["messages"][-1]["adaptiveCards"][0]["body"][0][ - "text" - ] = resp_txt - print( - "Preserved the message from being deleted", - file=sys.stderr, - ) - await wss.close() - if not self.aio_session.closed: - await self.aio_session.close() - yield True, response - return - if response.get("type") != 2: - if response.get("type") == 6: - await wss.send_str(append_identifier({"type": 6})) - elif response.get("type") == 7: - await wss.send_str(append_identifier({"type": 7})) - elif raw: - yield False, response + elif response.get("type") == 2: + if response["item"]["result"].get("error"): + await self.close() + raise Exception( + f"{response['item']['result']['value']}: {response['item']['result']['message']}", + ) + if draw: + cache = response["item"]["messages"][1][ + "adaptiveCards" + ][0]["body"][0]["text"] + response["item"]["messages"][1]["adaptiveCards"][0][ + "body" + ][0]["text"] = cache + resp_txt + if ( + response["item"]["messages"][-1]["contentOrigin"] + == "Apology" + and resp_txt + ): + response["item"]["messages"][-1][ + "text" + ] = resp_txt_no_link + response["item"]["messages"][-1]["adaptiveCards"][0][ + "body" + ][0]["text"] = resp_txt + print( + "Preserved the message from being deleted", + file=sys.stderr, + ) + await wss.close() + if not self.aio_session.closed: + await self.aio_session.close() + yield True, response + return + if response.get("type") != 2: + if response.get("type") == 6: + await wss.send_str(append_identifier({"type": 6})) + elif response.get("type") == 7: + await wss.send_str(append_identifier({"type": 7})) + elif raw: + yield False, response + except Exception as e: + print(e) + print(response) + continue async def _initial_handshake(self, wss) -> None: await wss.send_str(append_identifier({"protocol": "json", "version": 1})) @@ -248,12 +260,17 @@ async def delete_conversation( self, conversation_id: str = None, conversation_signature: str = None, + encrypted_conversation_signature: str = None, client_id: str = None, ) -> None: conversation_id = conversation_id or self.request.conversation_id conversation_signature = ( conversation_signature or self.request.conversation_signature ) + encrypted_conversation_signature = ( + encrypted_conversation_signature + or self.request.encrypted_conversation_signature + ) client_id = client_id or self.request.client_id url = "https://sydney.bing.com/sydney/DeleteSingleConversation" await self.session.post( @@ -261,6 +278,7 @@ async def delete_conversation( json={ "conversationId": conversation_id, "conversationSignature": conversation_signature, + "encryptedConversationSignature": encrypted_conversation_signature, "participant": {"id": client_id}, "source": "cib", "optionsSets": ["autosave"], @@ -269,3 +287,53 @@ async def delete_conversation( async def close(self) -> None: await self.session.aclose() + + def _build_upload_arguments( + self, attachment: str, image_base64: bytes | None = None + ) -> aiohttp.FormData: + data = aiohttp.FormData() + payload = { + "imageInfo": {"url": attachment}, + "knowledgeRequest": { + "invokedSkills": ["ImageById"], + "subscriptionId": "Bing.Chat.Multimodal", + "invokedSkillsRequestData": {"enableFaceBlur": False}, + "convoData": { + "convoid": self.request.conversation_id, + "convotone": "creative", + }, + }, + } + data.add_field( + "knowledgeRequest", json.dumps(payload), content_type="application/json" + ) + if image_base64: + data.add_field( + "imageBase64", image_base64, content_type="application/octet-stream" + ) + return data + + async def upload_attachment(self, attachment: str) -> dict: + image_base64 = None + with open(attachment, "rb") as file: + image_base64 = b64encode(file.read()) + use_proxy = self.proxy is None + session = aiohttp.ClientSession( + headers=KBLOB_HEADERS, + cookies=cookies_to_dict(self.cookies), + trust_env=use_proxy, + connector=aiohttp.TCPConnector(verify_ssl=False) if use_proxy else None, + ) + data = self._build_upload_arguments(attachment, image_base64) + async with session.post( + "https://www.bing.com/images/kblob", data=data + ) as response: + if response.status != 200: + raise RuntimeError( + f"Failed to upload image, received status: {response.status}" + ) + response_dict = await response.json() + if not response_dict["blobId"] or len(response_dict["blobId"]) == 0: + raise RuntimeError("Failed to parse image info") + await session.close() + return response_dict diff --git a/src/EdgeGPT/constants.py b/src/EdgeGPT/constants.py index d5426724e..f34391a8f 100644 --- a/src/EdgeGPT/constants.py +++ b/src/EdgeGPT/constants.py @@ -7,48 +7,68 @@ take_ip_socket.close() DELIMITER = "\x1e" +# Generate random IP between range 13.104.0.0/14 +# FORWARDED_IP = f"1.0.0.{random.randint(0, 255)}" +USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.82" HEADERS = { "accept": "application/json", - "accept-language": "en-US;q=0.9", + "accept-language": "en-US,en;q=0.9", "accept-encoding": "gzip, deflate, br, zsdch", "content-type": "application/json", - "sec-ch-ua": '"Not/A)Brand";v="99", "Microsoft Edge";v="115", "Chromium";v="115"', + "sec-ch-ua": '"Not.A/Brand";v="8", "Chromium";v="114", "Microsoft Edge";v="114"', "sec-ch-ua-arch": '"x86"', "sec-ch-ua-bitness": '"64"', - "sec-ch-ua-full-version": '"115.0.1901.188"', - "sec-ch-ua-full-version-list": '"Not/A)Brand";v="99.0.0.0", "Microsoft Edge";v="115.0.1901.188", "Chromium";v="115.0.5790.114"', + "sec-ch-ua-full-version": '"114.0.1823.82"', + "sec-ch-ua-full-version-list": '"Not.A/Brand";v="8.0.0.0", "Chromium";v="114.0.5735.201", "Microsoft Edge";v="114.0.1823.82"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-model": "", "sec-ch-ua-platform": '"Windows"', - "sec-ch-ua-platform-version": '"15.0.0"', + "sec-ch-ua-platform-version": '"14.0.0"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", - "sec-ms-gec-version": "1-115.0.1901.188", + "sec-ms-gec-version": "1-114.0.1823.82", "x-ms-client-request-id": str(uuid.uuid4()), "x-ms-useragent": "azsdk-js-api-client-factory/1.0.0-beta.1 core-rest-pipeline/1.10.3 OS/Windows", - "Referer": "https://www.bing.com/search?", - "Referrer-Policy": "origin-when-cross-origin", + "referer": "https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx", + "referrer-policy": "origin-when-cross-origin", "x-forwarded-for": FORWARDED_IP, } HEADERS_INIT_CONVER = { "authority": "www.bing.com", "accept": "application/json", - "accept-language": "en-US;q=0.9", + "accept-language": "en-US,en;q=0.9", "cache-control": "max-age=0", - "sec-ch-ua": '"Not/A)Brand";v="99", "Microsoft Edge";v="115", "Chromium";v="115"', + "sec-ch-ua": '"Not.A/Brand";v="8", "Chromium";v="114", "Microsoft Edge";v="114"', "sec-ch-ua-arch": '"x86"', "sec-ch-ua-bitness": '"64"', - "sec-ch-ua-full-version": '"115.0.1901.188"', - "sec-ch-ua-full-version-list": '"Not/A)Brand";v="99.0.0.0", "Microsoft Edge";v="115.0.1901.188", "Chromium";v="115.0.5790.114"', + "sec-ch-ua-full-version": '"114.0.1823.82"', + "sec-ch-ua-full-version-list": '"Not.A/Brand";v="8.0.0.0", "Chromium";v="114.0.5735.201", "Microsoft Edge";v="114.0.1823.82"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-model": '""', "sec-ch-ua-platform": '"Windows"', - "sec-ch-ua-platform-version": '"15.0.0"', + "sec-ch-ua-platform-version": '"14.0.0"', "upgrade-insecure-requests": "1", - "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.1901.188", + "user-agent": USER_AGENT, + "x-edge-shopping-flag": "1", + "x-forwarded-for": FORWARDED_IP, +} + +KBLOB_HEADERS = { + "accept": "image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8", + "accept-encoding": "gzip, deflate, br", + "accept-language": "en-US,en;q=0.9", + "content-type": "multipart/form-data", + "referer": "https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx", + "sec-ch-ua": '"Not.A/Brand";v="8", "Chromium";v="114", "Microsoft Edge";v="114"', + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": '"Windows"', + "sec-fetch-dest": "empty", + "sec-fetch-mode": "cors", + "sec-fetch-site": "same-origin", + "user-agent": USER_AGENT, "x-edge-shopping-flag": "1", "x-forwarded-for": FORWARDED_IP, } diff --git a/src/EdgeGPT/conversation.py b/src/EdgeGPT/conversation.py index 99008bc0f..838bd4d7a 100644 --- a/src/EdgeGPT/conversation.py +++ b/src/EdgeGPT/conversation.py @@ -1,7 +1,6 @@ import json import os -from typing import List -from typing import Union +from typing import List, Union import httpx @@ -55,6 +54,14 @@ def __init__( raise Exception("Authentication failed") try: self.struct = response.json() + print(self.struct) + if self.struct.get("conversationSignature") is None: + self.struct["conversationSignature"] = response.headers[ + "X-Sydney-Conversationsignature" + ] + self.struct["encryptedConversationSignature"] = response.headers[ + "X-Sydney-Encryptedconversationsignature" + ] except (json.decoder.JSONDecodeError, NotAllowedToAccess) as exc: raise Exception( "Authentication failed. You have not been accepted into the beta.", @@ -91,7 +98,8 @@ async def create( if cookies: formatted_cookies = httpx.Cookies() for cookie in cookies: - formatted_cookies.set(cookie["name"], cookie["value"]) + if cookie["name"] in ["_U", "MUID"]: + formatted_cookies.set(cookie["name"], cookie["value"]) async with httpx.AsyncClient( proxies=proxy, timeout=30, @@ -112,11 +120,18 @@ async def create( raise Exception("Authentication failed") try: self.struct = response.json() + if self.struct.get("conversationSignature") is None: + self.struct["conversationSignature"] = response.headers[ + "X-Sydney-Conversationsignature" + ] + self.struct["encryptedConversationSignature"] = response.headers[ + "X-Sydney-Encryptedconversationsignature" + ] except (json.decoder.JSONDecodeError, NotAllowedToAccess) as exc: - print(response.text) raise Exception( "Authentication failed. You have not been accepted into the beta.", ) from exc if self.struct["result"]["value"] == "UnauthorizedRequest": - raise NotAllowedToAccess(self.struct["result"]["message"]) + print(self.struct) + raise NotAllowedToAccess(self.struct["result"]["value"]) return self diff --git a/src/EdgeGPT/conversation_style.py b/src/EdgeGPT/conversation_style.py index a5c54d118..f9831e502 100644 --- a/src/EdgeGPT/conversation_style.py +++ b/src/EdgeGPT/conversation_style.py @@ -6,49 +6,26 @@ from typing_extensions import Literal from typing import Optional +_BASE_OPTION_SETS = [ + "nlu_direct_response_filter", + "deepleo", + "disable_emoji_spoken_text", + "responsible_ai_policy_235", + "enablemm", + "iycapbing", + "iyxapbing", + "dv3sugg", + "iyoloxap", + "iyoloneutral", + "gencontentv3", + "nojbf", +] class ConversationStyle(Enum): - creative = [ - "nlu_direct_response_filter", - "deepleo", - "disable_emoji_spoken_text", - "responsible_ai_policy_235", - "enablemm", - "h3imaginative", - "objopinion", - "dsblhlthcrd", - "dv3sugg", - "autosave", - "clgalileo", - "gencontentv3", - ] - balanced = [ - "nlu_direct_response_filter", - "deepleo", - "disable_emoji_spoken_text", - "responsible_ai_policy_235", - "enablemm", - "galileo", - "saharagenconv5", - "objopinion", - "dsblhlthcrd", - "dv3sugg", - "autosave", - ] - precise = [ - "nlu_direct_response_filter", - "deepleo", - "disable_emoji_spoken_text", - "responsible_ai_policy_235", - "enablemm", - "h3precise", - "objopinion", - "dsblhlthcrd", - "dv3sugg", - "autosave", - "clgalileo", - "gencontentv3", - ] + creative = _BASE_OPTION_SETS + ["h3imaginative"] + balanced = _BASE_OPTION_SETS + ["galileo"] + precise = _BASE_OPTION_SETS + ["h3precise"] + CONVERSATION_STYLE_TYPE = Optional[ diff --git a/src/EdgeGPT/locale.py b/src/EdgeGPT/locale.py index 06a422b02..5f8776294 100644 --- a/src/EdgeGPT/locale.py +++ b/src/EdgeGPT/locale.py @@ -48,14 +48,14 @@ class LocationHint(Enum): "locale": "en-IE", "LocationHint": [ { - "country": "Norway", + "country": "Ireland", "state": "", - "city": "Oslo", + "city": "Dublin", "timezoneoffset": 1, "countryConfidence": 8, "Center": { - "Latitude": 59.9139, - "Longitude": 10.7522, + "Latitude": 53.2732, + "Longitude": -6.2109, }, "RegionType": 2, "SourceType": 1, diff --git a/src/EdgeGPT/request.py b/src/EdgeGPT/request.py index 70249bc44..99f8dd896 100644 --- a/src/EdgeGPT/request.py +++ b/src/EdgeGPT/request.py @@ -2,17 +2,17 @@ from datetime import datetime from typing import Union -from .conversation_style import CONVERSATION_STYLE_TYPE -from .conversation_style import ConversationStyle -from .utilities import get_location_hint_from_locale -from .utilities import get_ran_hex -from .utilities import guess_locale +from .conversation_style import CONVERSATION_STYLE_TYPE, ConversationStyle +from .utilities import get_location_hint_from_locale, get_ran_hex, guess_locale + +BING_BLOB_URL = "https://copilot.microsoft.com/images/blob?bcid=" class ChatHubRequest: def __init__( self, conversation_signature: str, + encrypted_conversation_signature: str, client_id: str, conversation_id: str, invocation_id: int = 3, @@ -22,6 +22,7 @@ def __init__( self.client_id: str = client_id self.conversation_id: str = conversation_id self.conversation_signature: str = conversation_signature + self.encrypted_conversation_signature: str = encrypted_conversation_signature self.invocation_id: int = invocation_id def update( @@ -31,6 +32,9 @@ def update( webpage_context: Union[str, None] = None, search_result: bool = False, locale: str = guess_locale(), + attachment_info: dict | None = None, + mode: str = None, + no_search: bool = True, ) -> None: options = [ "deepleo", @@ -42,6 +46,13 @@ def update( if not isinstance(conversation_style, ConversationStyle): conversation_style = getattr(ConversationStyle, conversation_style) options = conversation_style.value + #enable gpt4_turbo + if mode == 'gpt4-turbo': + options.append('dlgpt4t') + + if no_search: + options.append('nosearchall') + message_id = str(uuid.uuid4()) # Get the current local time now_local = datetime.now() @@ -61,10 +72,17 @@ def update( # Get current time timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + offset_string + image_url, original_image_url = None, None + + if attachment_info: + image_url = BING_BLOB_URL + attachment_info["blobId"] + original_image_url = BING_BLOB_URL + attachment_info["blobId"] + self.struct = { "arguments": [ { "source": "cib", + "scenario": "SERP", "optionsSets": options, "allowedMessageTypes": [ "ActionRequest", @@ -83,26 +101,26 @@ def update( ], "sliceIds": [ "winmuid1tf", - "styleoff", - "ccadesk", - "smsrpsuppv4cf", - "ssrrcache", - "contansperf", - "crchatrev", - "winstmsg2tf", - "creatgoglt", - "creatorv2t", - "sydconfigoptt", - "adssqovroff", - "530pstho", - "517opinion", - "418dhlth", - "512sprtic1s0", - "emsgpr", - "525ptrcps0", + "newmma-prod", + "imgchatgptv2", + "tts2", + "voicelang2", + "anssupfotest", + "emptyoson", + "tempcacheread", + "temptacache", + "ctrlworkpay", + "winlongmsg2tf", + "628fabocs0", + "531rai268s0", + "602refusal", + "621alllocs0", + "621docxfmtho", + "621preclsvn", + "330uaug", "529rweas0", - "515oscfing2s0", - "524vidansgs0", + "0626snptrcs0", + "619dagslnv1nr" ], "verbosity": "verbose", "traceId": get_ran_hex(32), @@ -119,10 +137,14 @@ def update( "messageType": "Chat", "messageId": message_id, "requestId": message_id, + "imageUrl": image_url, + "originalImageUrl": original_image_url, }, "tone": conversation_style.name.capitalize(), # Make first letter uppercase + "spokenTextMode": "None", "requestId": message_id, "conversationSignature": self.conversation_signature, + "encryptedConversationSignature": self.encrypted_conversation_signature, "participant": { "id": self.client_id, }, @@ -133,7 +155,7 @@ def update( "target": "chat", "type": 4, } - if search_result: + if (not no_search) and (search_result): have_search_result = [ "InternalSearchQuery", "InternalSearchResult", diff --git a/src/EdgeGPT/utilities.py b/src/EdgeGPT/utilities.py index f54ccad7a..c5471c1ad 100644 --- a/src/EdgeGPT/utilities.py +++ b/src/EdgeGPT/utilities.py @@ -2,7 +2,7 @@ import locale import random import sys -from typing import Union +from typing import List, Union from .constants import DELIMITER from .locale import LocationHint @@ -35,3 +35,12 @@ def guess_locale() -> str: return "en-us" loc, _ = locale.getlocale() return loc.replace("_", "-") if loc else "en-us" + + +def cookies_to_dict(cookies: List[dict]) -> dict: + if cookies is None: + return None + all_cookies = {} + for cookie in cookies: + all_cookies[cookie["name"]] = cookie["value"] + return all_cookies diff --git a/test_base.py b/test_base.py index 0957544e0..6726ad4f7 100644 --- a/test_base.py +++ b/test_base.py @@ -1,19 +1,25 @@ import json +import os import pytest -from EdgeGPT.EdgeGPT import Chatbot -from EdgeGPT.EdgeGPT import ConversationStyle + +from src.EdgeGPT.EdgeGPT import Chatbot, ConversationStyle pytest_plugins = ("pytest_asyncio",) from os import getenv +cookies = json.loads( + open("C:\\Users\\Codete\\.config\\bing-cookies_1.json", encoding="utf-8").read() +) + +# @pytest.disable() @pytest.mark.asyncio() async def test_ask() -> None: - bot = await Chatbot.create(cookies=getenv("EDGE_COOKIES")) + bot = await Chatbot.create(cookies=cookies or getenv("EDGE_COOKIES")) response = await bot.ask( - prompt="find me some information about the new ai released by meta.", + prompt="tell me a joke about cats", conversation_style=ConversationStyle.balanced, simplify_response=True, )