diff --git a/packages/sdk-java/pom.xml b/packages/sdk-java/pom.xml index 45c9ea895..346731815 100644 --- a/packages/sdk-java/pom.xml +++ b/packages/sdk-java/pom.xml @@ -25,6 +25,7 @@ 1.8 UTF-8 3.6.0 + 0.8.12 5.14.1 1.3.16 2.0.60 @@ -81,6 +82,25 @@ + + org.jacoco + jacoco-maven-plugin + ${jacoco-maven-plugin.version} + + + + prepare-agent + + + + report + test + + report + + + + diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/Options.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/Options.java deleted file mode 100644 index 82b0a4652..000000000 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/Options.java +++ /dev/null @@ -1,6 +0,0 @@ -package com.alibaba.qwen.code.cli; - -import com.alibaba.qwen.code.cli.transport.TransportOptions; - -public class Options extends TransportOptions { -} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/QwenCli.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/QwenCli.java deleted file mode 100644 index 0471ab692..000000000 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/QwenCli.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.alibaba.qwen.code.cli; - -import java.util.ArrayList; -import java.util.List; - -import com.alibaba.qwen.code.cli.protocol.message.Message; -import com.alibaba.qwen.code.cli.protocol.message.SDKSystemMessage; -import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKAssistantMessage; -import com.alibaba.qwen.code.cli.session.Session; -import com.alibaba.qwen.code.cli.session.event.SessionEventSimpleConsumers; -import com.alibaba.qwen.code.cli.transport.Transport; -import com.alibaba.qwen.code.cli.transport.process.ProcessTransport; - -public class QwenCli { - public static List query(String prompt) { - Transport transport; - try { - transport = new ProcessTransport(); - } catch (Exception e) { - throw new RuntimeException("initialized ProcessTransport error!", e); - } - - Session session; - try { - session = new Session(transport); - } catch (Exception e) { - throw new RuntimeException("initialized Session error!", e); - } - - final List response = new ArrayList<>(); - try { - session.sendPrompt(prompt, new SessionEventSimpleConsumers() { - @Override - public void onSystemMessage(Session session, SDKSystemMessage systemMessage) { - response.add(systemMessage); - } - - @Override - public void onAssistantMessage(Session session, SDKAssistantMessage assistantMessage) { - response.add(assistantMessage); - } - }); - } catch (Exception e) { - throw new RuntimeException("sendPrompt error!", e); - } - - try { - session.close(); - } catch (Exception e) { - throw new RuntimeException("close Session error!", e); - } - return response; - } -} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/QwenCodeCli.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/QwenCodeCli.java new file mode 100644 index 000000000..591552fc3 --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/QwenCodeCli.java @@ -0,0 +1,70 @@ +package com.alibaba.qwen.code.cli; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.qwen.code.cli.protocol.data.AssistantContent; +import com.alibaba.qwen.code.cli.protocol.data.behavior.Behavior.Operation; +import com.alibaba.qwen.code.cli.session.Session; +import com.alibaba.qwen.code.cli.session.event.SessionEventSimpleConsumers; +import com.alibaba.qwen.code.cli.transport.Transport; +import com.alibaba.qwen.code.cli.transport.TransportOptions; +import com.alibaba.qwen.code.cli.transport.process.ProcessTransport; +import com.alibaba.qwen.code.cli.utils.MyConcurrentUtils; +import com.alibaba.qwen.code.cli.utils.Timeout; + +public class QwenCodeCli { + public static List simpleQuery(String prompt) { + final List response = new ArrayList<>(); + MyConcurrentUtils.runAndWait(() -> simpleQuery(prompt, response::add), Timeout.TIMEOUT_30_MINUTES); + return response; + } + + public static void simpleQuery(String prompt, Consumer messageConsumer) { + Session session = newSessionWithProcessTransport(new TransportOptions()); + try { + session.sendPrompt(prompt, new SessionEventSimpleConsumers() { + @Override + public void onAssistantMessageIncludePartial(Session session, List assistantContents, AssistantMessageOutputType assistantMessageOutputType) { + messageConsumer.accept(assistantContents.stream() + .map(AssistantContent::getContent) + .map(content -> { + if (content instanceof String) { + return (String) content; + } else { + return JSON.toJSONString(content); + } + }).collect(Collectors.joining())); + } + }.setDefaultPermissionOperation(Operation.allow)); + } catch (Exception e) { + throw new RuntimeException("sendPrompt error!", e); + } + + try { + session.close(); + } catch (Exception e) { + throw new RuntimeException("close Session error!", e); + } + } + + public static Session newSessionWithProcessTransport(TransportOptions transportOptions) { + Transport transport; + try { + transport = new ProcessTransport(transportOptions); + } catch (Exception e) { + throw new RuntimeException("initialized ProcessTransport error!", e); + } + + Session session; + try { + session = new Session(transport); + } catch (Exception e) { + throw new RuntimeException("initialized Session error!", e); + } + return session; + } +} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/data/AssistantContent.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/data/AssistantContent.java new file mode 100644 index 000000000..40d7f520d --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/data/AssistantContent.java @@ -0,0 +1,6 @@ +package com.alibaba.qwen.code.cli.protocol.data; + +public interface AssistantContent { + String getType(); + Object getContent(); +} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/SDKAssistantMessage.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/SDKAssistantMessage.java index 7e906fc44..b0a3012c4 100644 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/SDKAssistantMessage.java +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/SDKAssistantMessage.java @@ -15,6 +15,11 @@ public class SDKAssistantMessage extends MessageBase { @JSONField(name = "parent_tool_use_id") private String parentToolUseId; + public SDKAssistantMessage() { + super(); + this.type = "assistant"; + } + public String getUuid() { return uuid; } diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/SDKPartialAssistantMessage.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/SDKPartialAssistantMessage.java new file mode 100644 index 000000000..a9ac24d05 --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/SDKPartialAssistantMessage.java @@ -0,0 +1,55 @@ +package com.alibaba.qwen.code.cli.protocol.message.assistant; + +import com.alibaba.fastjson2.annotation.JSONField; +import com.alibaba.fastjson2.annotation.JSONType; +import com.alibaba.qwen.code.cli.protocol.message.MessageBase; +import com.alibaba.qwen.code.cli.protocol.message.assistant.event.StreamEvent; + +@JSONType(typeKey = "type", typeName = "stream_event") +public class SDKPartialAssistantMessage extends MessageBase { + private String uuid; + + @JSONField(name = "session_id") + private String sessionId; + private StreamEvent event; + + @JSONField(name = "parent_tool_use_id") + private String parentToolUseId; + + public SDKPartialAssistantMessage() { + super(); + this.type = "stream_event"; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + public String getSessionId() { + return sessionId; + } + + public void setSessionId(String sessionId) { + this.sessionId = sessionId; + } + + public StreamEvent getEvent() { + return event; + } + + public void setEvent(StreamEvent event) { + this.event = event; + } + + public String getParentToolUseId() { + return parentToolUseId; + } + + public void setParentToolUseId(String parentToolUseId) { + this.parentToolUseId = parentToolUseId; + } +} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/ContentBlock.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/ContentBlock.java index 3e72ad7d0..d40200c6e 100644 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/ContentBlock.java +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/ContentBlock.java @@ -4,9 +4,10 @@ import java.util.List; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.annotation.JSONType; +import com.alibaba.qwen.code.cli.protocol.data.AssistantContent; @JSONType(typeKey = "type", typeName = "ContentBlock", seeAlso = { TextBlock.class, ToolResultBlock.class, ThinkingBlock.class, ToolUseBlock.class }) -public class ContentBlock { +public abstract class ContentBlock implements AssistantContent { protected String type; protected List annotations; diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/TextBlock.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/TextBlock.java index 86e5513d3..7a8cf7d43 100644 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/TextBlock.java +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/TextBlock.java @@ -13,4 +13,9 @@ public class TextBlock extends ContentBlock { public void setText(String text) { this.text = text; } + + @Override + public Object getContent() { + return text; + } } diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/ThinkingBlock.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/ThinkingBlock.java index fa479563f..4a133840f 100644 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/ThinkingBlock.java +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/ThinkingBlock.java @@ -22,4 +22,9 @@ public class ThinkingBlock extends ContentBlock{ public void setSignature(String signature) { this.signature = signature; } + + @Override + public Object getContent() { + return thinking; + } } diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/ToolUseBlock.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/ToolUseBlock.java index 58a3bd4fc..ef5de8b02 100644 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/ToolUseBlock.java +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/block/ToolUseBlock.java @@ -46,4 +46,9 @@ public class ToolUseBlock extends ContentBlock { public void setAnnotations(List annotations) { this.annotations = annotations; } + + @Override + public Object getContent() { + return input; + } } diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/ContentBlockDeltaEvent.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/ContentBlockDeltaEvent.java new file mode 100644 index 000000000..78b7961cc --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/ContentBlockDeltaEvent.java @@ -0,0 +1,96 @@ +package com.alibaba.qwen.code.cli.protocol.message.assistant.event; + +import com.alibaba.fastjson2.annotation.JSONField; +import com.alibaba.fastjson2.annotation.JSONType; +import com.alibaba.qwen.code.cli.protocol.data.AssistantContent; + +@JSONType(typeKey = "type", typeName = "content_block_delta") +public class ContentBlockDeltaEvent extends StreamEvent { + private int index; + private ContentBlockDelta delta; + + public int getIndex() { + return index; + } + + public void setIndex(int index) { + this.index = index; + } + + public ContentBlockDelta getDelta() { + return delta; + } + + public void setDelta(ContentBlockDelta delta) { + this.delta = delta; + } + + @JSONType(typeKey = "type", typeName = "ContentBlockDelta", + seeAlso = {ContentBlockDeltaText.class, ContentBlockDeltaThinking.class, ContentBlockDeltaInputJson.class}) + public abstract static class ContentBlockDelta implements AssistantContent { + private String type; + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + } + + @JSONType(typeKey = "type", typeName = "text_delta") + public static class ContentBlockDeltaText extends ContentBlockDelta { + private String text; + + public String getText() { + return text; + } + + public void setText(String text) { + this.text = text; + } + + @Override + public Object getContent() { + return text; + } + } + + @JSONType(typeKey = "type", typeName = "thinking_delta") + public static class ContentBlockDeltaThinking extends ContentBlockDelta { + private String thinking; + + public String getThinking() { + return thinking; + } + + public void setThinking(String thinking) { + this.thinking = thinking; + } + + @Override + public Object getContent() { + return thinking; + } + } + + @JSONType(typeKey = "type", typeName = "input_json_delta") + public static class ContentBlockDeltaInputJson extends ContentBlockDelta { + @JSONField(name = "partial_json") + private String partialJson; + + public String getPartialJson() { + return partialJson; + } + + public void setPartialJson(String partialJson) { + this.partialJson = partialJson; + } + + @Override + public Object getContent() { + return partialJson; + } + } +} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/ContentBlockStartEvent.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/ContentBlockStartEvent.java new file mode 100644 index 000000000..884558512 --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/ContentBlockStartEvent.java @@ -0,0 +1,13 @@ +package com.alibaba.qwen.code.cli.protocol.message.assistant.event; + +import com.alibaba.fastjson2.annotation.JSONField; +import com.alibaba.fastjson2.annotation.JSONType; +import com.alibaba.qwen.code.cli.protocol.message.assistant.block.ContentBlock; + +@JSONType(typeKey = "type", typeName = "content_block_start") +public class ContentBlockStartEvent extends StreamEvent{ + private int index; + + @JSONField(name = "content_block") + private ContentBlock contentBlock; +} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/ContentBlockStopEvent.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/ContentBlockStopEvent.java new file mode 100644 index 000000000..0e950f817 --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/ContentBlockStopEvent.java @@ -0,0 +1,16 @@ +package com.alibaba.qwen.code.cli.protocol.message.assistant.event; + +import com.alibaba.fastjson2.annotation.JSONType; + +@JSONType(typeKey = "type", typeName = "content_block_stop") +public class ContentBlockStopEvent extends StreamEvent{ + Long index; + + public Long getIndex() { + return index; + } + + public void setIndex(Long index) { + this.index = index; + } +} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/MessageStartStreamEvent.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/MessageStartStreamEvent.java new file mode 100644 index 000000000..88be40545 --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/MessageStartStreamEvent.java @@ -0,0 +1,47 @@ +package com.alibaba.qwen.code.cli.protocol.message.assistant.event; + +import com.alibaba.fastjson2.annotation.JSONType; + +@JSONType(typeName = "message_start") +public class MessageStartStreamEvent extends StreamEvent{ + private Message message; + + public static class Message { + private String id; + private String role; + private String model; + + // Getters and setters + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getRole() { + return role; + } + + public void setRole(String role) { + this.role = role; + } + + public String getModel() { + return model; + } + + public void setModel(String model) { + this.model = model; + } + } + + public Message getMessage() { + return message; + } + + public void setMessage(Message message) { + this.message = message; + } +} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/MessageStopStreamEvent.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/MessageStopStreamEvent.java new file mode 100644 index 000000000..3ea04bc50 --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/MessageStopStreamEvent.java @@ -0,0 +1,7 @@ +package com.alibaba.qwen.code.cli.protocol.message.assistant.event; + +import com.alibaba.fastjson2.annotation.JSONType; + +@JSONType(typeName = "message_stop") +public class MessageStopStreamEvent extends StreamEvent{ +} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/StreamEvent.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/StreamEvent.java new file mode 100644 index 000000000..d288402fa --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/assistant/event/StreamEvent.java @@ -0,0 +1,18 @@ +package com.alibaba.qwen.code.cli.protocol.message.assistant.event; + +import com.alibaba.fastjson2.annotation.JSONType; + +@JSONType(typeKey = "type", typeName = "StreamEvent", + seeAlso = {MessageStartStreamEvent.class, MessageStopStreamEvent.class, ContentBlockStartEvent.class, ContentBlockStopEvent.class, + ContentBlockDeltaEvent.class}) +public class StreamEvent { + protected String type; + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } +} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/control/CLIControlSetModelResponse.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/control/CLIControlSetModelResponse.java new file mode 100644 index 000000000..71d6b0e38 --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/protocol/message/control/CLIControlSetModelResponse.java @@ -0,0 +1,22 @@ +package com.alibaba.qwen.code.cli.protocol.message.control; + +public class CLIControlSetModelResponse { + String subtype = "set_model"; + String model; + + public String getSubtype() { + return subtype; + } + + public void setSubtype(String subtype) { + this.subtype = subtype; + } + + public String getModel() { + return model; + } + + public void setModel(String model) { + this.model = model; + } +} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/session/Session.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/session/Session.java index 79a210742..c3e605476 100644 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/session/Session.java +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/session/Session.java @@ -2,6 +2,8 @@ package com.alibaba.qwen.code.cli.session; import java.io.IOException; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; @@ -15,6 +17,7 @@ import com.alibaba.qwen.code.cli.protocol.message.SDKResultMessage; import com.alibaba.qwen.code.cli.protocol.message.SDKSystemMessage; import com.alibaba.qwen.code.cli.protocol.message.SDKUserMessage; import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKAssistantMessage; +import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKPartialAssistantMessage; import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlInitializeRequest; import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlInitializeResponse; import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlInterruptRequest; @@ -28,16 +31,19 @@ import com.alibaba.qwen.code.cli.session.event.SessionEventConsumers; import com.alibaba.qwen.code.cli.session.exception.SessionControlException; import com.alibaba.qwen.code.cli.session.exception.SessionSendPromptException; import com.alibaba.qwen.code.cli.transport.Transport; +import com.alibaba.qwen.code.cli.utils.MyConcurrentUtils; +import com.alibaba.qwen.code.cli.utils.Timeout; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Session { + private static final Logger log = LoggerFactory.getLogger(Session.class); private final Transport transport; private CLIControlInitializeResponse lastCliControlInitializeResponse; private SDKSystemMessage lastSdkSystemMessage; - private static final Logger log = LoggerFactory.getLogger(Session.class); + private final Timeout defaultEventTimeout = Timeout.TIMEOUT_60_SECONDS; public Session(Transport transport) throws SessionControlException { if (transport == null || !transport.isAvailable()) { @@ -61,43 +67,44 @@ public class Session { } } - public void interrupt() throws SessionControlException { - if (!isAvailable()) { - throw new SessionControlException("Session is not available"); - } - + public void close() throws SessionControlException { try { - transport.inputNoWaitResponse( - new CLIControlRequest().setRequest(new CLIControlInterruptRequest()).toString()); + transport.close(); } catch (Exception e) { - throw new SessionControlException("Failed to interrupt the session", e); + throw new SessionControlException("Failed to close the session", e); } } - public void setModel(String modelName) throws SessionControlException { - if (!isAvailable()) { - throw new SessionControlException("Session is not available"); - } + public Optional interrupt() throws SessionControlException { + checkAvailable(); + return processControlRequest(new CLIControlRequest().setRequest(new CLIControlInterruptRequest()).toString()); + } + public Optional setModel(String modelName) throws SessionControlException { + checkAvailable(); CLIControlSetModelRequest cliControlSetModelRequest = new CLIControlSetModelRequest(); cliControlSetModelRequest.setModel(modelName); - try { - transport.inputNoWaitResponse(new CLIControlRequest().setRequest(cliControlSetModelRequest).toString()); - } catch (Exception e) { - throw new SessionControlException("Failed to set model", e); - } + return processControlRequest(new CLIControlRequest().setRequest(cliControlSetModelRequest).toString()); } - public void setPermissionMode(PermissionMode permissionMode) throws SessionControlException { - if (!isAvailable()) { - throw new SessionControlException("Session is not available"); - } - + public Optional setPermissionMode(PermissionMode permissionMode) throws SessionControlException { + checkAvailable(); CLIControlSetPermissionModeRequest cliControlSetPermissionModeRequest = new CLIControlSetPermissionModeRequest(); cliControlSetPermissionModeRequest.setMode(permissionMode.getValue()); + return processControlRequest( + new CLIControlRequest().setRequest(cliControlSetPermissionModeRequest).toString()); + } + + private Optional processControlRequest(String request) throws SessionControlException { try { - transport.inputNoWaitResponse( - new CLIControlRequest().setRequest(cliControlSetPermissionModeRequest).toString()); + if (transport.isReading()) { + transport.inputNoWaitResponse(request); + return Optional.empty(); + } else { + String response = transport.inputWaitForOneLine(request); + CLIControlResponse cliControlResponse = JSON.parseObject(response, new TypeReference>() {}); + return Optional.of("success".equals(cliControlResponse.getResponse().getSubtype())); + } } catch (Exception e) { throw new SessionControlException("Failed to set model", e); } @@ -108,61 +115,43 @@ public class Session { } public void resumeSession(String sessionId) throws SessionControlException { - if (!isAvailable()) { - throw new SessionControlException("Session is not available"); - } - if (StringUtils.isNotBlank(sessionId)) { transport.getTransportOptions().setResumeSessionId(sessionId); } this.start(); } - public String getSessionId() { - return Optional.ofNullable(lastSdkSystemMessage).map(SDKSystemMessage::getSessionId).orElse(null); - } - - public void close() throws SessionControlException { - try { - transport.close(); - } catch (Exception e) { - throw new SessionControlException("Failed to close the session", e); - } - } - - public boolean isAvailable() { - return transport.isAvailable(); - } - - public Capabilities getCapabilities() { - return Optional.ofNullable(lastCliControlInitializeResponse).map(CLIControlInitializeResponse::getCapabilities).orElse(new Capabilities()); - } - - public void sendPrompt(String prompt, SessionEventConsumers sessionEventConsumers) throws SessionSendPromptException { - if (!transport.isAvailable()) { - throw new SessionSendPromptException("Session is not available"); - } - + public void sendPrompt(String prompt, SessionEventConsumers sessionEventConsumers) throws SessionSendPromptException, SessionControlException { + checkAvailable(); try { transport.inputWaitForMultiLine(new SDKUserMessage().setContent(prompt).toString(), (line) -> { - log.debug("read a message from agent {}", line); JSONObject jsonObject = JSON.parseObject(line); String messageType = jsonObject.getString("type"); if ("system".equals(messageType)) { lastSdkSystemMessage = jsonObject.to(SDKSystemMessage.class); - sessionEventConsumers.onSystemMessage(this, lastSdkSystemMessage); + MyConcurrentUtils.runAndWait(() -> sessionEventConsumers.onSystemMessage(this, lastSdkSystemMessage), + Optional.ofNullable(sessionEventConsumers.onSystemMessageTimeout(this)).orElse(defaultEventTimeout)); return false; } else if ("assistant".equals(messageType)) { - sessionEventConsumers.onAssistantMessage(this, jsonObject.to(SDKAssistantMessage.class)); + MyConcurrentUtils.runAndWait(() -> sessionEventConsumers.onAssistantMessage(this, jsonObject.to(SDKAssistantMessage.class)), + Optional.ofNullable(sessionEventConsumers.onAssistantMessageTimeout(this)).orElse(defaultEventTimeout)); + return false; + } else if ("stream_event".equals(messageType)) { + MyConcurrentUtils.runAndWait(() -> sessionEventConsumers.onPartialAssistantMessage(this, jsonObject.to(SDKPartialAssistantMessage.class)), + Optional.ofNullable(sessionEventConsumers.onPartialAssistantMessageTimeout(this)).orElse(defaultEventTimeout)); return false; } else if ("user".equals(messageType)) { - sessionEventConsumers.onUserMessage(this, jsonObject.to(SDKUserMessage.class, Feature.FieldBased)); + MyConcurrentUtils.runAndWait( + () -> sessionEventConsumers.onUserMessage(this, jsonObject.to(SDKUserMessage.class, Feature.FieldBased)), + Optional.ofNullable(sessionEventConsumers.onUserMessageTimeout(this)).orElse(defaultEventTimeout)); return false; } else if ("result".equals(messageType)) { - sessionEventConsumers.onResultMessage(this, jsonObject.to(SDKResultMessage.class)); + MyConcurrentUtils.runAndWait(() -> sessionEventConsumers.onResultMessage(this, jsonObject.to(SDKResultMessage.class)), + Optional.ofNullable(sessionEventConsumers.onResultMessageTimeout(this)).orElse(defaultEventTimeout)); return true; } else if ("control_response".equals(messageType)) { - sessionEventConsumers.onControlResponse(this, jsonObject.to(CLIControlResponse.class)); + MyConcurrentUtils.runAndWait(() -> sessionEventConsumers.onControlResponse(this, jsonObject.to(CLIControlResponse.class)), + Optional.ofNullable(sessionEventConsumers.onControlResponseTimeout(this)).orElse(defaultEventTimeout)); if (!"error".equals(jsonObject.getString("subtype"))) { return false; } else { @@ -170,10 +159,11 @@ public class Session { return "error".equals(jsonObject.getString("subtype")); } } else if ("control_request".equals(messageType)) { - return processControlRequest(jsonObject, sessionEventConsumers); + return processControlRequestInThePrompting(jsonObject, sessionEventConsumers); } else { log.warn("unknown message type: {}", messageType); - sessionEventConsumers.onOtherMessage(this, line); + MyConcurrentUtils.runAndWait(() -> sessionEventConsumers.onOtherMessage(this, line), + Optional.ofNullable(sessionEventConsumers.onOtherMessageTimeout(this)).orElse(defaultEventTimeout)); return false; } }); @@ -182,7 +172,7 @@ public class Session { } } - private boolean processControlRequest(JSONObject jsonObject, SessionEventConsumers sessionEventConsumers) { + private boolean processControlRequestInThePrompting(JSONObject jsonObject, SessionEventConsumers sessionEventConsumers) { String subType = Optional.of(jsonObject) .map(cr -> cr.getJSONObject("request")) .map(r -> r.getString("subtype")) @@ -190,13 +180,21 @@ public class Session { if ("can_use_tool".equals(subType)) { try { return processPermissionResponse(jsonObject, sessionEventConsumers); - } catch (IOException e) { + } catch (IOException | ExecutionException | InterruptedException | TimeoutException e) { log.error("Failed to process permission response", e); return false; } } else { - CLIControlResponse cliControlResponse = sessionEventConsumers.onControlRequest(this, - jsonObject.to(new TypeReference>() {})); + CLIControlResponse cliControlResponse; + try { + cliControlResponse = MyConcurrentUtils.runAndWait( + () -> sessionEventConsumers.onControlRequest(this, jsonObject.to(new TypeReference>() {})), + Optional.ofNullable(sessionEventConsumers.onControlRequestTimeout(this)).orElse(defaultEventTimeout)); + } catch (Exception e) { + log.error("Failed to process control request", e); + return false; + } + if (cliControlResponse != null) { try { transport.inputNoWaitResponse(cliControlResponse.toString()); @@ -209,9 +207,13 @@ public class Session { } } - private boolean processPermissionResponse(JSONObject jsonObject, SessionEventConsumers sessionEventConsumers) throws IOException { - CLIControlRequest permissionRequest = jsonObject.to(new TypeReference>() {}); - Behavior behavior = Optional.ofNullable(sessionEventConsumers.onPermissionRequest(this, permissionRequest)) + private boolean processPermissionResponse(JSONObject jsonObject, SessionEventConsumers sessionEventConsumers) + throws IOException, ExecutionException, InterruptedException, TimeoutException { + CLIControlRequest permissionRequest = jsonObject.to( + new TypeReference>() {}); + + Behavior behavior = Optional.ofNullable(MyConcurrentUtils.runAndWait(() -> sessionEventConsumers.onPermissionRequest(this, permissionRequest), + Optional.ofNullable(sessionEventConsumers.onPermissionRequestTimeout(this)).orElse(defaultEventTimeout))) .map(b -> { if (b instanceof Allow) { Allow allow = (Allow) b; @@ -223,11 +225,30 @@ public class Session { }) .orElse(Behavior.defaultBehavior()); CLIControlResponse permissionResponse = new CLIControlResponse<>(); - permissionResponse.createResponse().setResponse(new CLIControlPermissionResponse().setBehavior(behavior)).setRequestId(permissionRequest.getRequestId()); + permissionResponse.createResponse().setResponse(new CLIControlPermissionResponse().setBehavior(behavior)).setRequestId( + permissionRequest.getRequestId()); String permissionMessage = permissionResponse.toString(); log.debug("send permission message to agent: {}", permissionMessage); transport.inputNoWaitResponse(permissionMessage); return false; } + + public String getSessionId() { + return Optional.ofNullable(lastSdkSystemMessage).map(SDKSystemMessage::getSessionId).orElse(null); + } + + public boolean isAvailable() { + return transport.isAvailable(); + } + + public Capabilities getCapabilities() { + return Optional.ofNullable(lastCliControlInitializeResponse).map(CLIControlInitializeResponse::getCapabilities).orElse(new Capabilities()); + } + + private void checkAvailable() throws SessionControlException { + if (!isAvailable()) { + throw new SessionControlException("Session is not available"); + } + } } diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/session/event/SessionEventConsumers.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/session/event/SessionEventConsumers.java index e2100b5cc..686620cfa 100644 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/session/event/SessionEventConsumers.java +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/session/event/SessionEventConsumers.java @@ -5,10 +5,12 @@ import com.alibaba.qwen.code.cli.protocol.message.SDKResultMessage; import com.alibaba.qwen.code.cli.protocol.message.SDKSystemMessage; import com.alibaba.qwen.code.cli.protocol.message.SDKUserMessage; import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKAssistantMessage; +import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKPartialAssistantMessage; import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlPermissionRequest; import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlRequest; import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlResponse; import com.alibaba.qwen.code.cli.session.Session; +import com.alibaba.qwen.code.cli.utils.Timeout; public interface SessionEventConsumers { void onSystemMessage(Session session, SDKSystemMessage systemMessage); @@ -17,6 +19,8 @@ public interface SessionEventConsumers { void onAssistantMessage(Session session, SDKAssistantMessage assistantMessage); + void onPartialAssistantMessage(Session session, SDKPartialAssistantMessage partialAssistantMessage); + void onUserMessage(Session session, SDKUserMessage userMessage); void onOtherMessage(Session session, String message); @@ -26,4 +30,22 @@ public interface SessionEventConsumers { CLIControlResponse onControlRequest(Session session, CLIControlRequest cliControlRequest); Behavior onPermissionRequest(Session session, CLIControlRequest permissionRequest); + + Timeout onSystemMessageTimeout(Session session); + + Timeout onResultMessageTimeout(Session session); + + Timeout onAssistantMessageTimeout(Session session); + + Timeout onPartialAssistantMessageTimeout(Session session); + + Timeout onUserMessageTimeout(Session session); + + Timeout onOtherMessageTimeout(Session session); + + Timeout onControlResponseTimeout(Session session); + + Timeout onControlRequestTimeout(Session session); + + Timeout onPermissionRequestTimeout(Session session); } diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/session/event/SessionEventSimpleConsumers.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/session/event/SessionEventSimpleConsumers.java index 9c685e755..fe21bbe96 100644 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/session/event/SessionEventSimpleConsumers.java +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/session/event/SessionEventSimpleConsumers.java @@ -1,14 +1,28 @@ package com.alibaba.qwen.code.cli.session.event; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import com.alibaba.qwen.code.cli.protocol.data.AssistantContent; +import com.alibaba.qwen.code.cli.protocol.data.behavior.Allow; import com.alibaba.qwen.code.cli.protocol.data.behavior.Behavior; +import com.alibaba.qwen.code.cli.protocol.data.behavior.Behavior.Operation; +import com.alibaba.qwen.code.cli.protocol.data.behavior.Deny; import com.alibaba.qwen.code.cli.protocol.message.SDKResultMessage; import com.alibaba.qwen.code.cli.protocol.message.SDKSystemMessage; import com.alibaba.qwen.code.cli.protocol.message.SDKUserMessage; import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKAssistantMessage; +import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKPartialAssistantMessage; +import com.alibaba.qwen.code.cli.protocol.message.assistant.event.ContentBlockDeltaEvent; +import com.alibaba.qwen.code.cli.protocol.message.assistant.event.StreamEvent; import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlPermissionRequest; import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlRequest; import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlResponse; import com.alibaba.qwen.code.cli.session.Session; +import com.alibaba.qwen.code.cli.utils.Timeout; public class SessionEventSimpleConsumers implements SessionEventConsumers { @Override @@ -21,6 +35,22 @@ public class SessionEventSimpleConsumers implements SessionEventConsumers { @Override public void onAssistantMessage(Session session, SDKAssistantMessage assistantMessage) { + onAssistantMessageIncludePartial(session, Optional.ofNullable(assistantMessage.getMessage().getContent()) + .map(cbs -> cbs.stream().map(cb -> (AssistantContent) cb).collect(Collectors.toList())) + .orElse(new ArrayList<>()), AssistantMessageOutputType.entire); + } + + @Override + public void onPartialAssistantMessage(Session session, SDKPartialAssistantMessage partialAssistantMessage) { + StreamEvent event = partialAssistantMessage.getEvent(); + if (!(event instanceof ContentBlockDeltaEvent)) { + return; + } + onAssistantMessageIncludePartial(session, Collections.singletonList(((ContentBlockDeltaEvent) event).getDelta()), AssistantMessageOutputType.partial); + } + + public void onAssistantMessageIncludePartial(Session session, List assistantContents, + AssistantMessageOutputType assistantMessageOutputType) { } @Override @@ -42,6 +72,89 @@ public class SessionEventSimpleConsumers implements SessionEventConsumers { @Override public Behavior onPermissionRequest(Session session, CLIControlRequest permissionRequest) { - return Behavior.defaultBehavior(); + if (Operation.deny.equals(this.defaultPermissionOperation)) { + return new Deny().setMessage("Permission denied."); + } else { + return new Allow().setUpdatedInput(permissionRequest.getRequest().getInput()); + } + } + + @Override + public Timeout onSystemMessageTimeout(Session session) { + return defaultEventTimeout; + } + + @Override + public Timeout onResultMessageTimeout(Session session) { + return defaultEventTimeout; + } + + @Override + public Timeout onAssistantMessageTimeout(Session session) { + return defaultEventTimeout; + } + + @Override + public Timeout onPartialAssistantMessageTimeout(Session session) { + return defaultEventTimeout; + } + + @Override + public Timeout onUserMessageTimeout(Session session) { + return defaultEventTimeout; + } + + @Override + public Timeout onOtherMessageTimeout(Session session) { + return defaultEventTimeout; + } + + @Override + public Timeout onControlResponseTimeout(Session session) { + return defaultEventTimeout; + } + + @Override + public Timeout onControlRequestTimeout(Session session) { + return defaultEventTimeout; + } + + @Override + public Timeout onPermissionRequestTimeout(Session session) { + return defaultEventTimeout; + } + + public Timeout getDefaultEventTimeout() { + return defaultEventTimeout; + } + + public SessionEventSimpleConsumers setDefaultEventTimeout(Timeout defaultEventTimeout) { + this.defaultEventTimeout = defaultEventTimeout; + return this; + } + + public Operation getDefaultPermissionOperation() { + return defaultPermissionOperation; + } + + public SessionEventSimpleConsumers setDefaultPermissionOperation(Operation defaultPermissionOperation) { + this.defaultPermissionOperation = defaultPermissionOperation; + return this; + } + + public SessionEventSimpleConsumers() { + } + + public SessionEventSimpleConsumers(Operation defaultPermissionOperation, Timeout defaultEventTimeout) { + this.defaultPermissionOperation = defaultPermissionOperation; + this.defaultEventTimeout = defaultEventTimeout; + } + + private Operation defaultPermissionOperation = Operation.deny; + protected Timeout defaultEventTimeout = Timeout.TIMEOUT_60_SECONDS; + + public enum AssistantMessageOutputType { + entire, + partial } } diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/Transport.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/Transport.java index b3d69ee28..af4266f45 100644 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/Transport.java +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/Transport.java @@ -8,6 +8,8 @@ import java.util.function.Function; public interface Transport { TransportOptions getTransportOptions(); + boolean isReading(); + void start() throws IOException; void close() throws IOException; diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/TransportOptions.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/TransportOptions.java index b5e6ada6f..7e274d1a0 100644 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/TransportOptions.java +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/TransportOptions.java @@ -4,6 +4,7 @@ import java.util.List; import java.util.Map; import com.alibaba.qwen.code.cli.protocol.data.PermissionMode; +import com.alibaba.qwen.code.cli.utils.Timeout; public class TransportOptions implements Cloneable { private String pathToQwenExecutable; @@ -17,120 +18,154 @@ public class TransportOptions implements Cloneable { private List allowedTools; private String authType; private Boolean includePartialMessages; - private Long turnTimeoutMs; - private Long messageTimeoutMs; + private Boolean skillsEnable; + private Timeout turnTimeout; + private Timeout messageTimeout; private String resumeSessionId; + private List otherOptions; public String getPathToQwenExecutable() { return pathToQwenExecutable; } - public void setPathToQwenExecutable(String pathToQwenExecutable) { + public TransportOptions setPathToQwenExecutable(String pathToQwenExecutable) { this.pathToQwenExecutable = pathToQwenExecutable; + return this; } public String getCwd() { return cwd; } - public void setCwd(String cwd) { + public TransportOptions setCwd(String cwd) { this.cwd = cwd; + return this; } public String getModel() { return model; } - public void setModel(String model) { + public TransportOptions setModel(String model) { this.model = model; + return this; } public PermissionMode getPermissionMode() { return permissionMode; } - public void setPermissionMode(PermissionMode permissionMode) { + public TransportOptions setPermissionMode(PermissionMode permissionMode) { this.permissionMode = permissionMode; + return this; } public Map getEnv() { return env; } - public void setEnv(Map env) { + public TransportOptions setEnv(Map env) { this.env = env; + return this; } public Integer getMaxSessionTurns() { return maxSessionTurns; } - public void setMaxSessionTurns(Integer maxSessionTurns) { + public TransportOptions setMaxSessionTurns(Integer maxSessionTurns) { this.maxSessionTurns = maxSessionTurns; + return this; } public List getCoreTools() { return coreTools; } - public void setCoreTools(List coreTools) { + public TransportOptions setCoreTools(List coreTools) { this.coreTools = coreTools; + return this; } public List getExcludeTools() { return excludeTools; } - public void setExcludeTools(List excludeTools) { + public TransportOptions setExcludeTools(List excludeTools) { this.excludeTools = excludeTools; + return this; } public List getAllowedTools() { return allowedTools; } - public void setAllowedTools(List allowedTools) { + public TransportOptions setAllowedTools(List allowedTools) { this.allowedTools = allowedTools; + return this; } public String getAuthType() { return authType; } - public void setAuthType(String authType) { + public TransportOptions setAuthType(String authType) { this.authType = authType; + return this; } public Boolean getIncludePartialMessages() { return includePartialMessages; } - public void setIncludePartialMessages(Boolean includePartialMessages) { + public TransportOptions setIncludePartialMessages(Boolean includePartialMessages) { this.includePartialMessages = includePartialMessages; + return this; } - public Long getTurnTimeoutMs() { - return turnTimeoutMs; + public Boolean getSkillsEnable() { + return skillsEnable; } - public void setTurnTimeoutMs(Long turnTimeoutMs) { - this.turnTimeoutMs = turnTimeoutMs; + public TransportOptions setSkillsEnable(Boolean skillsEnable) { + this.skillsEnable = skillsEnable; + return this; } - public Long getMessageTimeoutMs() { - return messageTimeoutMs; + public Timeout getTurnTimeout() { + return turnTimeout; } - public void setMessageTimeoutMs(Long messageTimeoutMs) { - this.messageTimeoutMs = messageTimeoutMs; + public TransportOptions setTurnTimeout(Timeout turnTimeout) { + this.turnTimeout = turnTimeout; + return this; + } + + public Timeout getMessageTimeout() { + return messageTimeout; + } + + public TransportOptions setMessageTimeout(Timeout messageTimeout) { + this.messageTimeout = messageTimeout; + return this; } public String getResumeSessionId() { return resumeSessionId; } - public void setResumeSessionId(String resumeSessionId) { + public TransportOptions setResumeSessionId(String resumeSessionId) { this.resumeSessionId = resumeSessionId; + return this; + } + + public List getOtherOptions() { + return otherOptions; + } + + public TransportOptions setOtherOptions(List otherOptions) { + this.otherOptions = otherOptions; + return this; } @Override diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/ProcessTransport.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/ProcessTransport.java index 14a0eb0ff..11be50ecc 100644 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/ProcessTransport.java +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/ProcessTransport.java @@ -2,6 +2,8 @@ package com.alibaba.qwen.code.cli.transport.process; import com.alibaba.qwen.code.cli.transport.Transport; import com.alibaba.qwen.code.cli.transport.TransportOptions; +import com.alibaba.qwen.code.cli.utils.MyConcurrentUtils; +import com.alibaba.qwen.code.cli.utils.Timeout; import org.apache.commons.lang3.exception.ContextedRuntimeException; import org.slf4j.Logger; @@ -14,29 +16,37 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.lang.ProcessBuilder.Redirect; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; public class ProcessTransport implements Transport { private static final Logger log = LoggerFactory.getLogger(ProcessTransport.class); private final TransportOptions transportOptions; - protected Long turnTimeoutMs; - protected Long messageTimeoutMs; + protected Timeout turnTimeout; + protected Timeout messageTimeout; protected Process process; protected BufferedWriter processInput; protected BufferedReader processOutput; protected BufferedReader processError; + protected final Consumer errorHandler; + + private final AtomicBoolean reading = new AtomicBoolean(false); public ProcessTransport() throws IOException { this(new TransportOptions()); } public ProcessTransport(TransportOptions transportOptions) throws IOException { + this(transportOptions, (line) -> log.error("process error: {}", line)); + } + + public ProcessTransport(TransportOptions transportOptions, Consumer errorHandler) throws IOException { this.transportOptions = transportOptions; + this.errorHandler = errorHandler; start(); } @@ -45,11 +55,16 @@ public class ProcessTransport implements Transport { return transportOptions; } + @Override + public boolean isReading() { + return reading.get(); + } + @Override public void start() throws IOException { TransportOptionsAdapter transportOptionsAdapter = new TransportOptionsAdapter(transportOptions); - this.turnTimeoutMs = transportOptionsAdapter.getHandledTransportOptions().getTurnTimeoutMs(); - this.messageTimeoutMs = transportOptionsAdapter.getHandledTransportOptions().getMessageTimeoutMs(); + this.turnTimeout = transportOptionsAdapter.getHandledTransportOptions().getTurnTimeout(); + this.messageTimeout = transportOptionsAdapter.getHandledTransportOptions().getMessageTimeout(); String[] commandArgs = transportOptionsAdapter.buildCommandArgs(); log.debug("trans to command args: {}", transportOptionsAdapter); @@ -91,113 +106,87 @@ public class ProcessTransport implements Transport { @Override public String inputWaitForOneLine(String message) throws IOException, ExecutionException, InterruptedException, TimeoutException { - return inputWaitForOneLine(message, turnTimeoutMs); + return inputWaitForOneLine(message, turnTimeout); } - private String inputWaitForOneLine(String message, long timeOutInMs) + private String inputWaitForOneLine(String message, Timeout timeOut) throws IOException, TimeoutException, InterruptedException, ExecutionException { inputNoWaitResponse(message); - CompletableFuture future = CompletableFuture.supplyAsync(() -> { - try { - return processOutput.readLine(); - } catch (IOException e) { - throw new ContextedRuntimeException("read line error", e) - .addContextValue("message", message); - } - }); - try { - String line = future.get(timeOutInMs, TimeUnit.MILLISECONDS); + reading.set(true); + String line = MyConcurrentUtils.runAndWait(() -> { + try { + return processOutput.readLine(); + } catch (IOException e) { + throw new ContextedRuntimeException("read line error", e) + .addContextValue("message", message); + } + }, timeOut); log.info("inputWaitForOneLine result: {}", line); return line; - } catch (TimeoutException e) { - future.cancel(true); - log.warn("read message timeout {}, canceled readOneLine task", timeOutInMs, e); - throw e; - } catch (InterruptedException e) { - future.cancel(true); - log.warn("interrupted task, canceled task", e); - throw e; - } catch (ExecutionException e) { - future.cancel(true); - log.warn("the readOneLine task execute error", e); - throw e; + } finally { + reading.set(false); } } @Override public void inputWaitForMultiLine(String message, Function callBackFunction) throws IOException { - inputWaitForMultiLine(message, callBackFunction, turnTimeoutMs); + inputWaitForMultiLine(message, callBackFunction, turnTimeout); } - private void inputWaitForMultiLine(String message, Function callBackFunction, long timeOutInMs) throws IOException { + private void inputWaitForMultiLine(String message, Function callBackFunction, Timeout timeOut) throws IOException { log.debug("input message for multiLine: {}", message); inputNoWaitResponse(message); - - CompletableFuture future = CompletableFuture.runAsync(() -> iterateOutput(callBackFunction)); - try { - future.get(timeOutInMs, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - future.cancel(true); - log.warn("read message timeout {}, canceled readMultiMessages task", timeOutInMs, e); - } catch (InterruptedException e) { - future.cancel(true); - log.warn("interrupted task, canceled task", e); - } catch (ExecutionException e) { - future.cancel(true); - log.warn("the readMultiMessages task execute error", e); - } catch (Exception e) { - future.cancel(true); - log.warn("other error"); - } + MyConcurrentUtils.runAndWait(() -> iterateOutput(callBackFunction), timeOut); } @Override public void inputNoWaitResponse(String message) throws IOException { - log.debug("input message to agent: {}", message); + log.debug("input message to process: {}", message); processInput.write(message); processInput.newLine(); processInput.flush(); } private void startErrorReading() { - CompletableFuture.runAsync(() -> { + MyConcurrentUtils.asyncRun(() -> { try { - String line; - while ((line = processError.readLine()) != null) { - System.err.println("错误: " + line); - } - } catch (Exception e) { - System.err.println("错误: " + e.getMessage()); - } - }); - } - - private void iterateOutput(Function callBackFunction) { - CompletableFuture future = CompletableFuture.runAsync(() -> { - try { - for (String line = processOutput.readLine(); line != null; line = processOutput.readLine()) { - log.debug("read a message from agent {}", line); - if (callBackFunction.apply(line)) { + for (;;) { + final String line = processError.readLine(); + if (line == null) { break; } + if (errorHandler != null) { + try { + MyConcurrentUtils.runAndWait(() -> errorHandler.accept(line), messageTimeout); + } catch (Exception e) { + log.warn("error handler error", e); + } + } } } catch (IOException e) { - throw new RuntimeException("read process output error", e); + log.warn("Failed read error {}, caused by {}", e.getMessage(), e.getCause(), e); } - }); + }, (e, t) -> log.warn("read error {}", t.getMessage(), t)); + } + private void iterateOutput(Function callBackFunction) { try { - future.get(messageTimeoutMs, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - log.warn("read message task interrupted", e); - future.cancel(true); - } catch (TimeoutException e) { - log.warn("Operation timed out", e); - future.cancel(true); - } catch (Exception e) { - future.cancel(true); - log.warn("Operation error", e); + reading.set(true); + MyConcurrentUtils.runAndWait(() -> { + try { + for (String line = processOutput.readLine(); line != null; line = processOutput.readLine()) { + log.debug("read a message from process {}", line); + if (callBackFunction.apply(line)) { + break; + } + } + } catch (IOException e) { + throw new RuntimeException("read process output error", e); + } + }, messageTimeout); + } finally { + reading.set(false); } } } diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/TransportOptionsAdapter.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/TransportOptionsAdapter.java index 1f179f93e..a2226a499 100644 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/TransportOptionsAdapter.java +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/TransportOptionsAdapter.java @@ -1,6 +1,7 @@ package com.alibaba.qwen.code.cli.transport.process; import com.alibaba.qwen.code.cli.transport.TransportOptions; +import com.alibaba.qwen.code.cli.utils.Timeout; import org.apache.commons.lang3.StringUtils; @@ -11,11 +12,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; class TransportOptionsAdapter { TransportOptions transportOptions; - private static final Long DEFAULT_TURN_TIMEOUT_MS = 1000 * 60 * 30L; - private static final Long DEFAULT_MESSAGE_TIMEOUT_MS = 1000 * 60 * 3L; + private static final Timeout DEFAULT_TURN_TIMEOUT = new Timeout(1000 * 60 * 30L, TimeUnit.MILLISECONDS); + private static final Timeout DEFAULT_MESSAGE_TIMEOUT = new Timeout(1000 * 60 * 3L, TimeUnit.MILLISECONDS); TransportOptionsAdapter(TransportOptions userTransportOptions) { transportOptions = addDefaultTransportOptions(userTransportOptions); @@ -73,10 +75,18 @@ class TransportOptionsAdapter { args.add("--include-partial-messages"); } + if (transportOptions.getSkillsEnable() != null && transportOptions.getSkillsEnable()) { + args.add("--experimental-skills"); + } + if (StringUtils.isNotBlank(transportOptions.getResumeSessionId())) { args.add("--resume"); args.add(transportOptions.getResumeSessionId()); } + + if (transportOptions.getOtherOptions() != null) { + args.addAll(transportOptions.getOtherOptions()); + } return args.toArray(new String[] {}); } @@ -97,12 +107,12 @@ class TransportOptionsAdapter { Optional.ofNullable(transportOptions.getEnv()).ifPresent(env::putAll); transportOptions.setEnv(env); - if (transportOptions.getTurnTimeoutMs() == null) { - transportOptions.setTurnTimeoutMs(DEFAULT_TURN_TIMEOUT_MS); + if (transportOptions.getTurnTimeout() == null) { + transportOptions.setTurnTimeout(DEFAULT_TURN_TIMEOUT); } - if (transportOptions.getMessageTimeoutMs() == null) { - transportOptions.setMessageTimeoutMs(DEFAULT_MESSAGE_TIMEOUT_MS); + if (transportOptions.getMessageTimeout() == null) { + transportOptions.setMessageTimeout(DEFAULT_MESSAGE_TIMEOUT); } return transportOptions; } diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/utils/MyConcurrentUtils.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/utils/MyConcurrentUtils.java new file mode 100644 index 000000000..3b5cf22da --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/utils/MyConcurrentUtils.java @@ -0,0 +1,65 @@ +package com.alibaba.qwen.code.cli.utils; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MyConcurrentUtils { + private static final Logger log = LoggerFactory.getLogger(MyConcurrentUtils.class); + + public static void runAndWait(Runnable runnable, Timeout timeOut) { + CompletableFuture future = CompletableFuture.runAsync(() -> { + try { + runnable.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, ThreadPoolConfig.getExecutor()); + try { + future.get(timeOut.getValue(), timeOut.getUnit()); + } catch (InterruptedException e) { + log.warn("task interrupted", e); + future.cancel(true); + } catch (TimeoutException e) { + log.warn("Operation timed out", e); + future.cancel(true); + } catch (Exception e) { + future.cancel(true); + log.warn("Operation error", e); + } + } + + public static T runAndWait(Supplier supplier, Timeout timeOut) + throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + try { + return supplier.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, ThreadPoolConfig.getExecutor()); + + try { + return future.get(timeOut.getValue(), timeOut.getUnit()); + } catch (TimeoutException | InterruptedException | ExecutionException e) { + future.cancel(true); + throw e; + } + } + + public static void asyncRun(Runnable runnable, BiConsumer errorCallback) { + CompletableFuture future = CompletableFuture.runAsync(() -> { + try { + runnable.run(); + } catch (Exception e) { + log.warn("async task error", e); + } + }, ThreadPoolConfig.getExecutor()); + future.whenComplete(errorCallback); + } +} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/utils/ThreadPoolConfig.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/utils/ThreadPoolConfig.java new file mode 100644 index 000000000..59b7ef4cb --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/utils/ThreadPoolConfig.java @@ -0,0 +1,43 @@ +package com.alibaba.qwen.code.cli.utils; + +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +public class ThreadPoolConfig { + private static final ThreadPoolExecutor defaultExecutor = new ThreadPoolExecutor( + 10, 30, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(300), + new ThreadFactory() { + private final AtomicInteger threadNumber = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "qwen_code_cli-pool-" + threadNumber.getAndIncrement()); + t.setDaemon(false); + return t; + } + }, + new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略 + ); + + private static Supplier executorSupplier; + public static void setExecutorSupplier(Supplier executorSupplier) { + ThreadPoolConfig.executorSupplier = executorSupplier; + } + + public static ExecutorService getExecutor() { + return Optional.ofNullable(executorSupplier).map(s -> { + try { + return s.get(); + } catch (Exception e) { + return defaultExecutor; + } + }).orElse(defaultExecutor); + } +} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/utils/Timeout.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/utils/Timeout.java new file mode 100644 index 000000000..f00b39085 --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/utils/Timeout.java @@ -0,0 +1,27 @@ +package com.alibaba.qwen.code.cli.utils; + +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.Validate; + +public class Timeout { + private final Long value; + private final TimeUnit unit; + public Timeout(Long value, TimeUnit unit) { + Validate.notNull(value, "value can not be null"); + Validate.notNull(unit, "unit can not be null"); + this.value = value; + this.unit = unit; + } + + public Long getValue() { + return value; + } + + public TimeUnit getUnit() { + return unit; + } + + public static final Timeout TIMEOUT_60_SECONDS = new Timeout(60L, TimeUnit.SECONDS); + public static final Timeout TIMEOUT_30_MINUTES = new Timeout(60L, TimeUnit.MINUTES); +} diff --git a/packages/sdk-java/src/test/java/com/alibaba/qwen/code/cli/QwenCliTest.java b/packages/sdk-java/src/test/java/com/alibaba/qwen/code/cli/QwenCodeCliTest.java similarity index 58% rename from packages/sdk-java/src/test/java/com/alibaba/qwen/code/cli/QwenCliTest.java rename to packages/sdk-java/src/test/java/com/alibaba/qwen/code/cli/QwenCodeCliTest.java index 01295ce6c..51be8bf4c 100644 --- a/packages/sdk-java/src/test/java/com/alibaba/qwen/code/cli/QwenCliTest.java +++ b/packages/sdk-java/src/test/java/com/alibaba/qwen/code/cli/QwenCodeCliTest.java @@ -2,21 +2,18 @@ package com.alibaba.qwen.code.cli; import java.util.List; -import com.alibaba.qwen.code.cli.protocol.message.Message; -import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKAssistantMessage; - import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.*; -class QwenCliTest { +class QwenCodeCliTest { - private static final Logger log = LoggerFactory.getLogger(QwenCliTest.class); + private static final Logger log = LoggerFactory.getLogger(QwenCodeCliTest.class); @Test - void query() { - List result = QwenCli.query("hello world"); + void simpleQuery() { + List result = QwenCodeCli.simpleQuery("hello world"); log.info("result: {}", result); assertNotNull(result); } diff --git a/packages/sdk-java/src/test/java/com/alibaba/qwen/code/cli/session/SessionTest.java b/packages/sdk-java/src/test/java/com/alibaba/qwen/code/cli/session/SessionTest.java index 51c37c6c8..8cdfa03c1 100644 --- a/packages/sdk-java/src/test/java/com/alibaba/qwen/code/cli/session/SessionTest.java +++ b/packages/sdk-java/src/test/java/com/alibaba/qwen/code/cli/session/SessionTest.java @@ -1,11 +1,14 @@ package com.alibaba.qwen.code.cli.session; import java.io.IOException; +import java.util.List; import com.alibaba.fastjson2.JSON; +import com.alibaba.qwen.code.cli.protocol.data.AssistantContent; import com.alibaba.qwen.code.cli.protocol.data.PermissionMode; import com.alibaba.qwen.code.cli.protocol.data.behavior.Allow; import com.alibaba.qwen.code.cli.protocol.data.behavior.Behavior; +import com.alibaba.qwen.code.cli.protocol.data.behavior.Behavior.Operation; import com.alibaba.qwen.code.cli.protocol.message.SDKResultMessage; import com.alibaba.qwen.code.cli.protocol.message.SDKSystemMessage; import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKAssistantMessage; @@ -17,6 +20,7 @@ import com.alibaba.qwen.code.cli.session.event.SessionEventSimpleConsumers; import com.alibaba.qwen.code.cli.session.exception.SessionControlException; import com.alibaba.qwen.code.cli.session.exception.SessionSendPromptException; import com.alibaba.qwen.code.cli.transport.Transport; +import com.alibaba.qwen.code.cli.transport.TransportOptions; import com.alibaba.qwen.code.cli.transport.process.ProcessTransport; import org.apache.commons.lang3.StringUtils; @@ -28,18 +32,34 @@ class SessionTest { private static final Logger log = LoggerFactory.getLogger(SessionTest.class); + @Test + void partialSendPromptSuccessfully() throws IOException, SessionControlException, SessionSendPromptException { + Transport transport = new ProcessTransport(new TransportOptions().setIncludePartialMessages(true)); + Session session = new Session(transport); + session.sendPrompt("in the dir src/test/temp/, create file empty file test.touch", new SessionEventSimpleConsumers() { + @Override + public void onAssistantMessageIncludePartial(Session session, List assistantContents, + AssistantMessageOutputType assistantMessageOutputType) { + log.info("onAssistantMessageIncludePartial: {}", JSON.toJSONString(assistantContents)); + } + }.setDefaultPermissionOperation(Operation.allow)); + } + @Test void setPermissionModeSuccessfully() throws IOException, SessionControlException, SessionSendPromptException { Transport transport = new ProcessTransport(); Session session = new Session(transport); - session.setPermissionMode(PermissionMode.YOLO); + log.info(session.setPermissionMode(PermissionMode.YOLO).map(s -> s ? "setPermissionMode 1 success" : "setPermissionMode 1 error") + .orElse("setPermissionMode 1 unknown")); session.sendPrompt("in the dir src/test/temp/, create file empty file test.touch", new SessionEventSimpleConsumers()); - session.setPermissionMode(PermissionMode.PLAN); + log.info(session.setPermissionMode(PermissionMode.PLAN).map(s -> s ? "setPermissionMode 2 success" : "setPermissionMode 2 error") + .orElse("setPermissionMode 2 unknown")); session.sendPrompt("rename test.touch to test_rename.touch", new SessionEventSimpleConsumers()); - session.setPermissionMode(PermissionMode.AUTO_EDIT); + log.info(session.setPermissionMode(PermissionMode.AUTO_EDIT).map(s -> s ? "setPermissionMode 3 success" : "setPermissionMode 3 error") + .orElse("setPermissionMode 3 unknown")); session.sendPrompt("rename test.touch to test_rename.touch", new SessionEventSimpleConsumers()); session.sendPrompt("rename test.touch to test_rename.touch again user will allow", new SessionEventSimpleConsumers() { @@ -57,19 +77,19 @@ class SessionTest { Transport transport = new ProcessTransport(); Session session = new Session(transport); - session.setModel("qwen3-coder-flash"); + log.info(session.setModel("qwen3-coder-flash").map(s -> s ? "setModel 1 success" : "setModel 1 error").orElse("setModel 1 unknown")); writeSplitLine("setModel 1 end"); session.sendPrompt("hello world", new SessionEventSimpleConsumers()); writeSplitLine("prompt 1 end"); - session.setModel("qwen3-coder-plus"); + log.info(session.setModel("qwen3-coder-plus").map(s -> s ? "setModel 2 success" : "setModel 2 error").orElse("setModel 2 unknown")); writeSplitLine("setModel 1 end"); session.sendPrompt("查看下当前目录有多少个文件", new SessionEventSimpleConsumers()); writeSplitLine("prompt 2 end"); - session.setModel("qwen3-max"); + log.info(session.setModel("qwen3-max").map(s -> s ? "setModel 3 success" : "setModel 3 error").orElse("setModel 3 unknown")); writeSplitLine("setModel 1 end"); session.sendPrompt("查看下当前目录有多少个xml文件", new SessionEventSimpleConsumers()); @@ -129,12 +149,17 @@ class SessionTest { } public void writeSplitLine(String line) { - log.info("{} {}",line, StringUtils.repeat("=", 300)); + log.info("{} {}", line, StringUtils.repeat("=", 300)); } @Test void testJSON() { - String json = "{\"type\":\"assistant\",\"uuid\":\"ed8374fe-a4eb-4fc0-9780-9bd2fd831cda\",\"session_id\":\"166badc0-e6d3-4978-ae47-4ccd51c468ef\",\"message\":{\"content\":[{\"text\":\"Hello! How can I help you with the Qwen Code SDK for Java today?\",\"type\":\"text\"}],\"id\":\"ed8374fe-a4eb-4fc0-9780-9bd2fd831cda\",\"model\":\"qwen3-coder-plus\",\"role\":\"assistant\",\"type\":\"message\",\"usage\":{\"cache_read_input_tokens\":12766,\"input_tokens\":12770,\"output_tokens\":17,\"total_tokens\":12787}}}"; + String json + = "{\"type\":\"assistant\",\"uuid\":\"ed8374fe-a4eb-4fc0-9780-9bd2fd831cda\"," + + "\"session_id\":\"166badc0-e6d3-4978-ae47-4ccd51c468ef\",\"message\":{\"content\":[{\"text\":\"Hello! How can I help you with the" + + " Qwen Code SDK for Java today?\",\"type\":\"text\"}],\"id\":\"ed8374fe-a4eb-4fc0-9780-9bd2fd831cda\"," + + "\"model\":\"qwen3-coder-plus\",\"role\":\"assistant\",\"type\":\"message\",\"usage\":{\"cache_read_input_tokens\":12766," + + "\"input_tokens\":12770,\"output_tokens\":17,\"total_tokens\":12787}}}"; SDKAssistantMessage assistantMessage = JSON.parseObject(json, SDKAssistantMessage.class); log.info("the assistantMessage: {}", assistantMessage); }