import deepEqual from 'fast-deep-equal';
import { type Observable } from 'rxjs/Observable';
import { of } from 'rxjs/observable/of';
import { concatMap } from 'rxjs/operators/concatMap';
import { delay } from 'rxjs/operators/delay';
import { distinctUntilChanged } from 'rxjs/operators/distinctUntilChanged';
import { filter } from 'rxjs/operators/filter';
import { map } from 'rxjs/operators/map';
import { skipWhile } from 'rxjs/operators/skipWhile';
import { switchMap } from 'rxjs/operators/switchMap';
import { timeInterval } from 'rxjs/operators/timeInterval';

import { type ObservableStatefulInput } from '@atlassian/rx-hooks';

import { type AIAgentStreamingRequest } from '../api/AssistantService/AIAgentStreamingModels';
import { createAIAgentRequestConfig } from '../api/AssistantService/createAIAgentRequestConfig';
import { getDefaultConfigWithUserSettings } from '../api/AssistantService/getDefaultConfigWithUserSettings';
import { makeAIAgentStreamRequest } from '../api/AssistantService/makeAIAgentStreamRequest';
import { parsePartialJsonIfNeeded } from '../api/AssistantService/parsePartialJsonIfNeeded';
import { logInfo } from '../logs/logInfo';
import { type UserAIAgentStreamingConfig } from '../models/AIAgentStreamingConfig';
import { AIAgentResponseState, type AIAgentStreamingState } from '../models/AIAgentStreamingState';

export function aiAgentStreamingEffect<TAIAgentContentResponse = string>(
	input$: Observable<
		ObservableStatefulInput<
			[AIAgentStreamingRequest, UserAIAgentStreamingConfig, string],
			AIAgentStreamingState<TAIAgentContentResponse>
		>
	>,
) {
	return input$.pipe(
		skipWhile(({ inputs: [_, { skip }] }) => !!skip),
		distinctUntilChanged(
			(
				{ inputs: [prevRequest, prevConfig, prevRefetchId] },
				{ inputs: [nextRequest, nextConfig, nextRefetchId] },
			) =>
				deepEqual(prevRequest, nextRequest) &&
				deepEqual(prevConfig, nextConfig) &&
				prevRefetchId === nextRefetchId,
		),
		map(({ inputs: [request, config] }) => ({
			config: getDefaultConfigWithUserSettings(config),
			request,
		})),
		map(({ request, config }) => ({
			requestConfig: createAIAgentRequestConfig(request, config),
			config,
		})),
		switchMap(({ requestConfig, config }) => {
			logInfo('pre request data', config, requestConfig);
			return makeAIAgentStreamRequest(requestConfig).pipe(
				map((state) => ({
					config,
					state,
				})),
			);
		}),
		filter(({ config, state }) => {
			if (!config.enableFollowUp && state.responseState === AIAgentResponseState.FollowUp) {
				return false;
			}
			return true;
		}),
		map(({ config, state }) => {
			logInfo('response data', config, state);
			const nextState = parsePartialJsonIfNeeded<TAIAgentContentResponse>(state, config);
			logInfo('post json parsing content', config, state);
			return { state: nextState, config };
		}),
		timeInterval(),
		concatMap(({ value: { state, config }, interval }) => {
			if (!config.delayChunkUpdate) {
				return of(state);
			}

			const { time, gap } = config.delayChunkUpdate;

			if (interval < gap) {
				return of(state).pipe(delay(time));
			}
			return of(state);
		}),
	);
}
