Unverified Commit 5287e7e5 authored by Roman's avatar Roman Committed by GitHub
Browse files

KubeObjectStore & KubeWatchApi fixes and optimizations (#2063)


* Detach NamespaceStore from KubeWatchApi, proper KubeObjectStore.loadAll (rebase of #2033)
Signed-off-by: default avatarRoman <ixrock@gmail.com>

* Watch-api requests optimization (#2066)

* subscribe for watching resources in single request when has admin-like access rights
Signed-off-by: default avatarRoman <ixrock@gmail.com>

* responding to comments
Signed-off-by: default avatarRoman <ixrock@gmail.com>

* fix unit-tests
Signed-off-by: default avatarRoman <ixrock@gmail.com>

* fix: reloading stores when preloading enabled and waitUntilLoaded=false
Signed-off-by: default avatarRoman <ixrock@gmail.com>

* mark Cluster.canUseWatchApi() and Cluster.refreshAccessibility() as private
Signed-off-by: default avatarRoman <ixrock@gmail.com>

* fix unit test: make public Cluster.canUseWatchApi()
Signed-off-by: default avatarRoman <ixrock@gmail.com>

* responding to comments in #2066
Signed-off-by: default avatarRoman <ixrock@gmail.com>
parent f56969a6
Showing with 143 additions and 96 deletions
+143 -96
......@@ -126,6 +126,7 @@ describe("create clusters", () => {
};
jest.spyOn(Cluster.prototype, "isClusterAdmin").mockReturnValue(Promise.resolve(true));
jest.spyOn(Cluster.prototype, "canUseWatchApi").mockReturnValue(Promise.resolve(true));
jest.spyOn(Cluster.prototype, "canI")
.mockImplementation((attr: V1ResourceAttributes): Promise<boolean> => {
expect(attr.namespace).toBe("default");
......
......@@ -48,6 +48,7 @@ export interface ClusterState {
isAdmin: boolean;
allowedNamespaces: string[]
allowedResources: string[]
isGlobalWatchEnabled: boolean;
}
/**
......@@ -91,7 +92,6 @@ export class Cluster implements ClusterModel, ClusterState {
*/
@observable initializing = false;
/**
* Is cluster object initialized
*
......@@ -177,6 +177,12 @@ export class Cluster implements ClusterModel, ClusterState {
* @observable
*/
@observable isAdmin = false;
/**
* Global watch-api accessibility , e.g. "/api/v1/services?watch=1"
*
* @observable
*/
@observable isGlobalWatchEnabled = false;
/**
* Preferences
*
......@@ -353,9 +359,7 @@ export class Cluster implements ClusterModel, ClusterState {
await this.refreshConnectionStatus();
if (this.accessible) {
await this.refreshAllowedResources();
this.isAdmin = await this.isClusterAdmin();
this.ready = true;
await this.refreshAccessibility();
this.ensureKubectl();
}
this.activated = true;
......@@ -410,13 +414,11 @@ export class Cluster implements ClusterModel, ClusterState {
await this.refreshConnectionStatus();
if (this.accessible) {
this.isAdmin = await this.isClusterAdmin();
await this.refreshAllowedResources();
await this.refreshAccessibility();
if (opts.refreshMetadata) {
this.refreshMetadata();
}
this.ready = true;
}
this.pushState();
}
......@@ -433,6 +435,18 @@ export class Cluster implements ClusterModel, ClusterState {
this.metadata = Object.assign(existingMetadata, metadata);
}
/**
* @internal
*/
private async refreshAccessibility(): Promise<void> {
this.isAdmin = await this.isClusterAdmin();
this.isGlobalWatchEnabled = await this.canUseWatchApi({ resource: "*" });
await this.refreshAllowedResources();
this.ready = true;
}
/**
* @internal
*/
......@@ -571,6 +585,17 @@ export class Cluster implements ClusterModel, ClusterState {
});
}
/**
* @internal
*/
async canUseWatchApi(customizeResource: V1ResourceAttributes = {}): Promise<boolean> {
return this.canI({
verb: "watch",
resource: "*",
...customizeResource,
});
}
toJSON(): ClusterModel {
const model: ClusterModel = {
id: this.id,
......@@ -604,6 +629,7 @@ export class Cluster implements ClusterModel, ClusterState {
isAdmin: this.isAdmin,
allowedNamespaces: this.allowedNamespaces,
allowedResources: this.allowedResources,
isGlobalWatchEnabled: this.isGlobalWatchEnabled,
};
return toJS(state, {
......
......@@ -5,12 +5,11 @@ import type { Cluster } from "../../main/cluster";
import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route";
import type { KubeObject } from "./kube-object";
import type { KubeObjectStore } from "../kube-object.store";
import type { NamespaceStore } from "../components/+namespaces/namespace.store";
import plimit from "p-limit";
import debounce from "lodash/debounce";
import { comparer, computed, observable, reaction } from "mobx";
import { autobind, EventEmitter } from "../utils";
import { autorun, comparer, computed, IReactionDisposer, observable, reaction } from "mobx";
import { autobind, EventEmitter, noop } from "../utils";
import { ensureObjectSelfLink, KubeApi, parseKubeApi } from "./kube-api";
import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api";
import { apiPrefix, isDebugging, isProduction } from "../../common/vars";
......@@ -19,6 +18,7 @@ import { apiManager } from "./api-manager";
export { IKubeWatchEvent, IKubeWatchEventStreamEnd };
export interface IKubeWatchMessage<T extends KubeObject = any> {
namespace?: string;
data?: IKubeWatchEvent<KubeJsonApiData>
error?: IKubeWatchEvent<KubeJsonApiError>;
api?: KubeApi<T>;
......@@ -28,7 +28,7 @@ export interface IKubeWatchMessage<T extends KubeObject = any> {
export interface IKubeWatchSubscribeStoreOptions {
preload?: boolean; // preload store items, default: true
waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true
cacheLoading?: boolean; // when enabled loading store will be skipped, default: false
loadOnce?: boolean; // check store.isLoaded to skip loading if done already, default: false
}
export interface IKubeWatchReconnectOptions {
......@@ -43,50 +43,49 @@ export interface IKubeWatchLog {
@autobind()
export class KubeWatchApi {
private cluster: Cluster;
private namespaceStore: NamespaceStore;
private requestId = 0;
private isConnected = false;
private reader: ReadableStreamReader<string>;
private subscribers = observable.map<KubeApi, number>();
// events
public onMessage = new EventEmitter<[IKubeWatchMessage]>();
@observable.ref private cluster: Cluster;
@observable.ref private namespaces: string[] = [];
@observable subscribers = observable.map<KubeApi, number>();
@observable isConnected = false;
@computed get isReady(): boolean {
return Boolean(this.cluster && this.namespaces);
}
@computed get isActive(): boolean {
return this.apis.length > 0;
}
@computed get apis(): string[] {
const { cluster, namespaceStore } = this;
const activeApis = Array.from(this.subscribers.keys());
if (!this.isReady) {
return [];
}
return activeApis.map(api => {
if (!cluster.isAllowedResource(api.kind)) {
return Array.from(this.subscribers.keys()).map(api => {
if (!this.isAllowedApi(api)) {
return [];
}
if (api.isNamespaced) {
return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace));
} else {
return api.getWatchUrl();
if (api.isNamespaced && !this.cluster.isGlobalWatchEnabled) {
return this.namespaces.map(namespace => api.getWatchUrl(namespace));
}
}).flat();
}
constructor() {
this.init();
return api.getWatchUrl();
}).flat();
}
private async init() {
const { getHostedCluster } = await import("../../common/cluster-store");
const { namespaceStore } = await import("../components/+namespaces/namespace.store");
await namespaceStore.whenReady;
this.cluster = getHostedCluster();
this.namespaceStore = namespaceStore;
async init({ getCluster, getNamespaces }: {
getCluster: () => Cluster,
getNamespaces: () => string[],
}): Promise<void> {
autorun(() => {
this.cluster = getCluster();
this.namespaces = getNamespaces();
});
this.bindAutoConnect();
}
......@@ -108,7 +107,7 @@ export class KubeWatchApi {
}
isAllowedApi(api: KubeApi): boolean {
return !!this?.cluster.isAllowedResource(api.kind);
return Boolean(this?.cluster.isAllowedResource(api.kind));
}
subscribeApi(api: KubeApi | KubeApi[]): () => void {
......@@ -129,45 +128,66 @@ export class KubeWatchApi {
};
}
subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void {
const { preload = true, waitUntilLoaded = true, cacheLoading = false } = options;
preloadStores(stores: KubeObjectStore[], { loadOnce = false } = {}) {
const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages
const preloading: Promise<any>[] = [];
for (const store of stores) {
preloading.push(limitRequests(async () => {
if (store.isLoaded && loadOnce) return; // skip
return store.loadAll(this.namespaces);
}));
}
return {
loading: Promise.allSettled(preloading),
cancelLoading: () => limitRequests.clearQueue(),
};
}
subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void {
const { preload = true, waitUntilLoaded = true, loadOnce = false } = options;
const apis = new Set(stores.map(store => store.getSubscribeApis()).flat());
const unsubscribeList: (() => void)[] = [];
let isUnsubscribed = false;
const load = () => this.preloadStores(stores, { loadOnce });
let preloading = preload && load();
let cancelReloading: IReactionDisposer = noop;
const subscribe = () => {
if (isUnsubscribed) return;
apis.forEach(api => unsubscribeList.push(this.subscribeApi(api)));
};
if (preload) {
for (const store of stores) {
preloading.push(limitRequests(async () => {
if (cacheLoading && store.isLoaded) return; // skip
return store.loadAll();
}));
if (preloading) {
if (waitUntilLoaded) {
preloading.loading.then(subscribe, error => {
this.log({
message: new Error("Loading stores has failed"),
meta: { stores, error, options },
});
});
} else {
subscribe();
}
}
if (waitUntilLoaded) {
Promise.all(preloading).then(subscribe, error => {
this.log({
message: new Error("Loading stores has failed"),
meta: { stores, error, options },
});
// reload when context namespaces changes
cancelReloading = reaction(() => this.namespaces, () => {
preloading?.cancelLoading();
preloading = load();
}, {
equals: comparer.shallow,
});
} else {
subscribe();
}
// unsubscribe
return () => {
if (isUnsubscribed) return;
isUnsubscribed = true;
limitRequests.clearQueue();
cancelReloading();
preloading?.cancelLoading();
unsubscribeList.forEach(unsubscribe => unsubscribe());
};
}
......@@ -254,6 +274,10 @@ export class KubeWatchApi {
const kubeEvent: IKubeWatchEvent = JSON.parse(json);
const message = this.getMessage(kubeEvent);
if (!this.namespaces.includes(message.namespace)) {
continue; // skip updates from non-watching resources context
}
this.onMessage.emit(message);
} catch (error) {
return json;
......@@ -286,6 +310,7 @@ export class KubeWatchApi {
message.api = api;
message.store = apiManager.getStore(api);
message.namespace = namespace;
}
break;
}
......
......@@ -58,11 +58,11 @@ export class ReleaseStore extends ItemStore<HelmRelease> {
}
@action
async loadAll() {
async loadAll(namespaces = namespaceStore.allowedNamespaces) {
this.isLoading = true;
try {
const items = await this.loadItems(namespaceStore.getContextNamespaces());
const items = await this.loadItems(namespaces);
this.items.replace(this.sortItems(items));
this.isLoaded = true;
......@@ -73,6 +73,10 @@ export class ReleaseStore extends ItemStore<HelmRelease> {
}
}
async loadSelectedNamespaces(): Promise<void> {
return this.loadAll(namespaceStore.getContextNamespaces());
}
async loadItems(namespaces: string[]) {
return Promise
.all(namespaces.map(namespace => helmReleasesApi.list(namespace)))
......@@ -82,7 +86,7 @@ export class ReleaseStore extends ItemStore<HelmRelease> {
async create(payload: IReleaseCreatePayload) {
const response = await helmReleasesApi.create(payload);
if (this.isLoaded) this.loadAll();
if (this.isLoaded) this.loadSelectedNamespaces();
return response;
}
......@@ -90,7 +94,7 @@ export class ReleaseStore extends ItemStore<HelmRelease> {
async update(name: string, namespace: string, payload: IReleaseUpdatePayload) {
const response = await helmReleasesApi.update(name, namespace, payload);
if (this.isLoaded) this.loadAll();
if (this.isLoaded) this.loadSelectedNamespaces();
return response;
}
......@@ -98,7 +102,7 @@ export class ReleaseStore extends ItemStore<HelmRelease> {
async rollback(name: string, namespace: string, revision: number) {
const response = await helmReleasesApi.rollback(name, namespace, revision);
if (this.isLoaded) this.loadAll();
if (this.isLoaded) this.loadSelectedNamespaces();
return response;
}
......
......@@ -30,7 +30,7 @@ export class CrdResources extends React.Component<Props> {
const { store } = this;
if (store && !store.isLoading && !store.isLoaded) {
store.loadAll();
store.loadSelectedNamespaces();
}
})
]);
......
......@@ -14,7 +14,7 @@ export interface KubeEventDetailsProps {
@observer
export class KubeEventDetails extends React.Component<KubeEventDetailsProps> {
async componentDidMount() {
eventStore.loadAll();
eventStore.loadSelectedNamespaces();
}
render() {
......
......@@ -32,8 +32,8 @@ export class NamespaceDetails extends React.Component<Props> {
}
componentDidMount() {
resourceQuotaStore.loadAll();
limitRangeStore.loadAll();
resourceQuotaStore.loadSelectedNamespaces();
limitRangeStore.loadSelectedNamespaces();
}
render() {
......
......@@ -73,7 +73,7 @@ export class NamespaceStore extends KubeObjectStore<Namespace> {
}
private autoLoadAllowedNamespaces(): IReactionDisposer {
return reaction(() => this.allowedNamespaces, () => this.loadAll(), {
return reaction(() => this.allowedNamespaces, namespaces => this.loadAll(namespaces), {
fireImmediately: true,
equals: comparer.shallow,
});
......
......@@ -29,9 +29,7 @@ export class NodeDetails extends React.Component<Props> {
});
async componentDidMount() {
if (!podsStore.isLoaded) {
podsStore.loadAll();
}
podsStore.loadSelectedNamespaces();
}
componentWillUnmount() {
......
......@@ -80,7 +80,7 @@ export class AddRoleBindingDialog extends React.Component<Props> {
];
this.isLoading = true;
await Promise.all(stores.map(store => store.loadAll()));
await Promise.all(stores.map(store => store.loadSelectedNamespaces()));
this.isLoading = false;
}
......
......@@ -20,9 +20,7 @@ interface Props extends KubeObjectDetailsProps<CronJob> {
@observer
export class CronJobDetails extends React.Component<Props> {
async componentDidMount() {
if (!jobStore.isLoaded) {
jobStore.loadAll();
}
jobStore.loadSelectedNamespaces();
}
render() {
......
......@@ -30,9 +30,7 @@ export class DaemonSetDetails extends React.Component<Props> {
});
componentDidMount() {
if (!podsStore.isLoaded) {
podsStore.loadAll();
}
podsStore.loadSelectedNamespaces();
}
componentWillUnmount() {
......
......@@ -31,9 +31,7 @@ export class DeploymentDetails extends React.Component<Props> {
});
componentDidMount() {
if (!podsStore.isLoaded) {
podsStore.loadAll();
}
podsStore.loadSelectedNamespaces();
}
componentWillUnmount() {
......
......@@ -25,9 +25,7 @@ interface Props extends KubeObjectDetailsProps<Job> {
@observer
export class JobDetails extends React.Component<Props> {
async componentDidMount() {
if (!podsStore.isLoaded) {
podsStore.loadAll();
}
podsStore.loadSelectedNamespaces();
}
render() {
......
......@@ -29,9 +29,7 @@ export class ReplicaSetDetails extends React.Component<Props> {
});
async componentDidMount() {
if (!podsStore.isLoaded) {
podsStore.loadAll();
}
podsStore.loadSelectedNamespaces();
}
componentWillUnmount() {
......
......@@ -30,9 +30,7 @@ export class StatefulSetDetails extends React.Component<Props> {
});
componentDidMount() {
if (!podsStore.isLoaded) {
podsStore.loadAll();
}
podsStore.loadSelectedNamespaces();
}
componentWillUnmount() {
......
import React from "react";
import { computed, observable, reaction } from "mobx";
import { disposeOnUnmount, observer } from "mobx-react";
import { Redirect, Route, Router, Switch } from "react-router";
import { history } from "../navigation";
......@@ -42,7 +43,7 @@ import { ClusterPageMenuRegistration, clusterPageMenuRegistry } from "../../exte
import { TabLayout, TabLayoutRoute } from "./layout/tab-layout";
import { StatefulSetScaleDialog } from "./+workloads-statefulsets/statefulset-scale-dialog";
import { eventStore } from "./+events/event.store";
import { computed, reaction, observable } from "mobx";
import { namespaceStore } from "./+namespaces/namespace.store";
import { nodesStore } from "./+nodes/nodes.store";
import { podsStore } from "./+workloads-pods/pods.store";
import { kubeWatchApi } from "../api/kube-watch-api";
......@@ -74,6 +75,12 @@ export class App extends React.Component {
window.location.reload();
});
whatInput.ask(); // Start to monitor user input device
await namespaceStore.whenReady;
await kubeWatchApi.init({
getCluster: getHostedCluster,
getNamespaces: namespaceStore.getContextNamespaces,
});
}
componentDidMount() {
......
......@@ -80,7 +80,7 @@ export class UpgradeChartStore extends DockTabStore<IChartUpgradeData> {
const values = this.values.getData(tabId);
await Promise.all([
!releaseStore.isLoaded && releaseStore.loadAll(),
!releaseStore.isLoaded && releaseStore.loadSelectedNamespaces(),
!values && this.loadValues(tabId)
]);
}
......
......@@ -138,7 +138,7 @@ export class ItemListLayout extends React.Component<ItemListLayoutProps> {
const { store, dependentStores } = this.props;
const stores = Array.from(new Set([store, ...dependentStores]));
stores.forEach(store => store.loadAll());
stores.forEach(store => store.loadAll(namespaceStore.getContextNamespaces()));
}
private filterCallbacks: { [type: string]: ItemsFilter } = {
......
......@@ -40,9 +40,7 @@ interface Props {
@observer
export class Sidebar extends React.Component<Props> {
async componentDidMount() {
if (!crdStore.isLoaded && isAllowedResource("customresourcedefinitions")) {
crdStore.loadAll();
}
crdStore.loadSelectedNamespaces();
}
renderCustomResources() {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment